Python multiprocessing: certaines fonctions ne reviennent pas lorsqu'elles sont terminées (matériel de file d'attente trop grand)
J'utilise le processus et la file d'attente de multiprocessing. Je commence plusieurs fonctions en parallèle et la plupart se comportent bien: elles finissent, leur sortie va dans leur file d'attente, et elles apparaissent comme. is_alive () = = False. Mais pour une raison quelconque, quelques fonctions ne se comportent pas. Ils montrent toujours .is_alive () = = True, même après la dernière ligne de la fonction (une instruction d'impression disant "terminé") est terminée. Cela se produit indépendamment de l'ensemble des fonctions que je lance, même s'il n'y en a qu'une seule. Si pas exécuter en parallèle, les fonctions se comportent bien et reviennent normalement. Quel Genre de chose pourrait être le problème?
Voici la fonction générique que j'utilise pour gérer les tâches. Tout ce que je ne montre pas, ce sont les fonctions que je lui transmets. Ils sont longs, utilisent souvent matplotlib, lancent parfois des commandes shell, mais je ne peux pas comprendre ce que les échecs ont en commun.
def runFunctionsInParallel(listOf_FuncAndArgLists):
Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.
from multiprocessing import Process, Queue
def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
que.put(fff(*theArgs)) #we're putting return value into queue
print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
# We get this far even for "bad" functions
queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
for job in jobs: job.start() # Launch them all
import time
from math import sqrt
while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
print('n---------------------------------------------------n'+ 't'.join(['alive?','Job','exitcode','Func',])+ 'n---------------------------------------------------')
print('n'.join(['%s:t%s:t%s:t%s'%(job.is_alive()*'Yes',,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
# I never get to the following line when one of the "bad" functions is running.
for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
# And now, collect all the outputs:
return([queue.get() for queue in queues])
1 réponses
D'accord, il semble que le tuyau utilisé pour remplir la file d'attente soit branché lorsque la sortie d'une fonction est trop grande (ma compréhension brute? C'est un non résolu/fermé bug? j'ai modifié le code dans ma question afin qu'il y ait une certaine mise en mémoire tampon (les files d'attente sont régulièrement vidées pendant que les processus sont en cours d'exécution), ce qui résout tous mes problèmes. Alors maintenant, cela prend une collection de tâches (fonctions et leurs arguments), les lance, et recueille les sorties. Je j'aimerais que ce soit plus simple /plus propre.
Edit (2014 Sep; update 2017 Nov: réécrit pour la lisibilité): je mets à jour le code avec les améliorations que j'ai apportées depuis. Le nouveau code (même fonction, mais de meilleures fonctionnalités) est ici:
La description de l'appelant est également ci-dessous.
def runFunctionsInParallel(*args, **kwargs):
""" This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments.
return cRunFunctionsInParallel(*args, **kwargs).launch_jobs()
class cRunFunctionsInParallel():
"""Run any list of functions, each with any arguments and keyword-arguments, in parallel.
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied.
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name.
listOf_FuncAndArgLists : a list of lists
List of up-to-three-element-lists, like [function, args, kwargs],
specifying the set of functions to be launched in parallel. If an
element is just a function, rather than a list, then it is assumed
to have no arguments or keyword arguments. Thus, possible formats
for elements of the outer list are:
[function, list]
[function, list, dict]
kwargs: dict
One can also supply the kwargs once, for all jobs (or for those
without their own non-empty kwargs specified in the list)
names: an optional list of names to identify the processes.
If omitted, the function name is used, so if all the functions are
the same (ie merely with different arguments), then they would be
named indistinguishably
offsetsSeconds: int or list of ints
delay some functions' start times
expectNonzeroExit: True/False
Normal behaviour is to not proceed if any function exits with a
failed exit code. This can be used to override this behaviour.
parallel: True/False
Whenever the list of functions is longer than one, functions will
be run in parallel unless this parameter is passed as False
maxAtOnce: int
If nonzero, this limits how many jobs will be allowed to run at
once. By default, this is set according to how many processors
the hardware has available.
showFinished : int
Specifies the maximum number of successfully finished jobs to show
in the text interface (before the last report, which should always
show them all).
Returns a tuple of (return codes, return values), each a list in order of the jobs provided.
Only tested on POSIX OSes.
See the testParallel() method in this module