Python multiprocessing pool est suspendu à join?

j'essaye d'exécuter du code python sur plusieurs fichiers en parallèle. La construction est en gros:

def process_file(filename, foo, bar, baz=biz):
    # do stuff that may fail and cause exception

if __name__ == '__main__':
    # setup code setting parameters foo, bar, and biz

    psize = multiprocessing.cpu_count()*2
    pool = multiprocessing.Pool(processes=psize)

    map(lambda x: pool.apply_async(process_file, (x, foo, bar), dict(baz=biz)), sys.argv[1:])
    pool.close()
    pool.join()

j'ai déjà utilisé pool.carte pour faire quelque chose de similaire et il a fonctionné grand, mais je ne semble pas pouvoir utiliser cela ici parce que piscine.map ne me permet pas (apparemment) de passer en arguments supplémentaires (et utiliser lambda pour le faire ne marchera pas parce que lambda ne peut pas être marshalled).

alors maintenant j'essaie de faire fonctionner les choses en utilisant apply_async() directement. Mon problème est que le le code semble pendre et ne jamais sortir. Quelques-uns des fichiers échouent avec une exception, mais je ne vois pas pourquoi ce qui causerait l'échec de join/hang? Il est intéressant de noter que si aucun des fichiers n'échoue avec une exception, il sort proprement.

Ce qui me manque?

Edit: Quand la fonction (et donc d'un travailleur) échoue, je vois cette exception:

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 376, in _handle_results
    task = get()
TypeError: ('__init__() takes at least 3 arguments (1 given)', <class 'subprocess.CalledProcessError'>, ())

si je vois ne serait-ce qu'un de ceux-ci, le processus parent est suspendu pour toujours, sans jamais récolter les enfants et en sortir.

31
demandé sur clemej 2013-03-09 22:15:14

3 réponses

désolé de répondre à ma propre question, mais j'ai trouvé au moins une solution de rechange donc au cas où quelqu'un d'autre a un problème similaire, je veux le poster ici. J'accepterai de meilleures réponses là-bas.

je crois que la racine du problème est http://bugs.python.org/issue9400 . Cela me dit deux choses:

  • Je ne suis pas fou, ce que j'essaie de faire est censé marcher
  • Au moins en python2, il est très difficile, sinon impossible, de cornichon 'exceptions' retour au processus parent. Les simples fonctionnent, mais beaucoup d'autres non.

dans mon cas, ma fonction de travailleur lançait un sous-processus qui était la segmentation. Cette exception retournée appelée Processerror, qui n'est pas pickleable. Pour une raison ou une autre, Cela fait que l'objet pool dans le parent sort pour déjeuner et ne revient pas de l'appel à join().

dans mon cas particulier, je me fiche de l'exception. Au plus je veux l'enregistrer et continuer. Faire ceci, j'enroule simplement ma fonction de l'ouvrier supérieur dans une clause d'essai/excepté. Si le travailleur lance une exception, il est capturé avant d'essayer de revenir au processus parent, connecté, puis le processus de travail s'arrête normalement car il n'est plus d'essayer d'envoyer l'exception à travers. Voir ci-dessous:

def process_file_wrapped(filenamen, foo, bar, baz=biz):
    try:
        process_file(filename, foo, bar, baz=biz)
    except:
        print('%s: %s' % (filename, traceback.format_exc()))

alors, j'ai ma fonction map initiale appelée process_file_wrapped() au lieu de la fonction originale. Maintenant, mon code fonctionne comme prévu.

43
répondu clemej 2013-03-10 02:01:59

Vous pouvez réellement utiliser un functools.partial exemple au lieu d'un lambda dans le cas où l'objet doit être nettoyée. partial les objets sont sélectionnables depuis Python 2.7 (et en Python 3).

pool.map(functools.partial(process_file, x, foo, bar, baz=biz), sys.argv[1:])
4
répondu nneonneo 2013-03-10 02:07:02

Pour ce que ça vaut, j'ai eu un bug similaire (pas le même) quand pool.map hung. Mon de cas d'utilisation m'a permis d'utiliser piscine.résilier pour le résoudre (assurez-vous que le vôtre le fait aussi bien avant de changer les choses).

j'ai utilisé piscine.map avant d'appeler terminate donc je sais que tout est terminé, de la docs:

un équivalent parallèle de la fonction intégrée de map () (elle ne supporte qu'un seul argument itérable). Il bloque jusqu'à ce que le résultat soit prêt.

Si c'est votre cas d'utilisation, ce peut être une façon de le patcher.

2
répondu Reut Sharabani 2014-12-16 18:01:34