Python multitraitement de la piscine.carte pour les arguments multiples
dans la bibliothèque multiprocessing de Python, il y a une variante de pool.map qui supporte plusieurs arguments?
text = "test"
def harvester(text, case):
X = case[0]
text+ str(X)
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
pool.map(harvester(text,case),case, 1)
pool.close()
pool.join()
15 réponses
la réponse à cette question dépend de la version et de la situation. La réponse la plus générale pour les versions récentes de Python (depuis 3.3) a d'abord été décrite ci-dessous par J. F. Sebastian . 1 il utilise la méthode Pool.starmap
, qui accepte une séquence de tuples d'arguments. Il déballe alors automatiquement les arguments de chaque tuple et les passe à la fonction donnée:
import multiprocessing
from itertools import product
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with multiprocessing.Pool(processes=3) as pool:
results = pool.starmap(merge_names, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
pour les versions de Python, vous aurez besoin d'écrire une fonction d'aide pour déballer les arguments explicitement. Si vous voulez utiliser with
, vous devrez aussi écrire un wrapper pour transformer Pool
en Gestionnaire de contexte. (Merci à muon pour l'avoir souligné.)
import multiprocessing
from itertools import product
from contextlib import contextmanager
def merge_names(a, b):
return '{} & {}'.format(a, b)
def merge_names_unpack(args):
return merge_names(*args)
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(merge_names_unpack, product(names, repeat=2))
print(results)
# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...
dans des cas plus simples, avec un second argument fixe, vous pouvez aussi utiliser partial
, mais seulement en python 2.7+.
import multiprocessing
from functools import partial
from contextlib import contextmanager
@contextmanager
def poolcontext(*args, **kwargs):
pool = multiprocessing.Pool(*args, **kwargs)
yield pool
pool.terminate()
def merge_names(a, b):
return '{} & {}'.format(a, b)
if __name__ == '__main__':
names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
with poolcontext(processes=3) as pool:
results = pool.map(partial(merge_names, b='Sons'), names)
print(results)
# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...
1. Beaucoup de ce a été inspiré par sa réponse, qui aurait probablement dû être acceptée à la place. Mais puisque celui-ci est coincé au sommet, il a semblé préférable de l'améliorer pour les futurs lecteurs.
est-il une variante de piscine.map qui supporte plusieurs arguments?
Python includes pool.starmap()
method "151915 3.30920":
#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support
def func(a, b):
return a + b
def main():
a_args = [1,2,3]
second_arg = 1
with Pool() as pool:
L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
M = pool.starmap(func, zip(a_args, repeat(second_arg)))
N = pool.map(partial(func, b=second_arg), a_args)
assert L == M == N
if __name__=="__main__":
freeze_support()
main()
pour les versions plus anciennes:
#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support
def func(a, b):
print a, b
def func_star(a_b):
"""Convert `f([1,2])` to `f(1,2)` call."""
return func(*a_b)
def main():
pool = Pool()
a_args = [1,2,3]
second_arg = 1
pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))
if __name__=="__main__":
freeze_support()
main()
Sortie
1 1
2 1
3 1
notez comment itertools.izip()
et itertools.repeat()
sont utilisés ici.
dû à le bug mentionné par @unutbu vous ne pouvez pas utiliser functools.partial()
ou des capacités similaires sur Python 2.6, de sorte que la simple fonction d'enrubannage func_star()
doit être définie explicitement. Voir aussi la solution suggérée par uptimebox
.
je pense que le ci-dessous sera mieux
def multi_run_wrapper(args):
return add(*args)
def add(x,y):
return x+y
if __name__ == "__main__":
from multiprocessing import Pool
pool = Pool(4)
results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
print results
sortie
[3, 5, 7]
utilisant Python 3.3+ avec pool.starmap():
from multiprocessing.dummy import Pool as ThreadPool
def write(i, x):
print(i, "---", x)
a = ["1","2","3"]
b = ["4","5","6"]
pool = ThreadPool(2)
pool.starmap(write, zip(a,b))
pool.close()
pool.join()
résultat:
1 --- 4
2 --- 5
3 --- 6
vous pouvez également zip () plus d'arguments si vous aimez: zip(a,b,c,d,e)
dans le cas où vous voulez avoir une valeur constante passée comme argument, vous devez utiliser import itertools
et ensuite zip(itertools.repeat(constant), a)
par exemple.
ayant appris les itertools dans J. F. Sebastian réponse j'ai décidé d'aller plus loin et d'écrire un paquet parmap
qui s'occupe de la parallélisation, offrant des fonctions map
et starmap
sur python-2.7 et Python-3.2 (et plus tard aussi) qui peut prendre n'importe quel nombre d'arguments de position.
Installation
pip install parmap
Comment paralléliser:
import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)
# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)
# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)
j'ai téléchargé parmap sur PyPI et sur un dépôt github .
à titre d'exemple, on peut répondre à la question comme suit:
import parmap
def harvester(case, text):
X = case[0]
text+ str(X)
if __name__ == "__main__":
case = RAW_DATASET # assuming this is an iterable
parmap.map(harvester, case, "test", chunksize=1)
il y a une bifurcation de multiprocessing
appelée pathos ( note: utilisez la version sur github ) qui n'a pas besoin de starmap
-- les fonctions de la carte reflètent l'API pour la carte de python, donc la carte peut prendre plusieurs arguments. Avec pathos
, vous pouvez aussi généralement faire du multiprocessing dans l'interpréteur, au lieu d'être bloqué dans le bloc __main__
. Pathos est prévu pour une sortie, après une légère mise à jour -- principalement la conversion en python 3.x.
Python 2.7.5 (default, Sep 30 2013, 20:15:49)
[GCC 4.2.1 (Apple Inc. build 5566)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> def func(a,b):
... print a,b
...
>>>
>>> from pathos.multiprocessing import ProcessingPool
>>> pool = ProcessingPool(nodes=4)
>>> pool.map(func, [1,2,3], [1,1,1])
1 1
2 1
3 1
[None, None, None]
>>>
>>> # also can pickle stuff like lambdas
>>> result = pool.map(lambda x: x**2, range(10))
>>> result
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
>>> # also does asynchronous map
>>> result = pool.amap(pow, [1,2,3], [4,5,6])
>>> result.get()
[1, 32, 729]
>>>
>>> # or can return a map iterator
>>> result = pool.imap(pow, [1,2,3], [4,5,6])
>>> result
<processing.pool.IMapIterator object at 0x110c2ffd0>
>>> list(result)
[1, 32, 729]
Vous pouvez utiliser les deux fonctions suivantes, afin d'éviter d'écrire un wrapper pour chaque nouvelle fonction:
import itertools
from multiprocessing import Pool
def universal_worker(input_pair):
function, args = input_pair
return function(*args)
def pool_args(function, *args):
return zip(itertools.repeat(function), zip(*args))
utilisez la fonction function
avec les listes d'arguments arg_0
, arg_1
et arg_2
comme suit:
pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
Une meilleure façon est d'utiliser décorateur au lieu d'écrire fonction wrapper à la main. Surtout quand vous avez beaucoup de fonctions à mapper, decorator va sauver votre temps en évitant d'écrire wrapper pour chaque fonction. Habituellement une fonction décorée n'est pas picklable, cependant nous pouvons utiliser functools
pour se déplacer. Plus de disscusions peuvent être trouvées ici .
Voici l'exemple
def unpack_args(func):
from functools import wraps
@wraps(func)
def wrapper(args):
if isinstance(args, dict):
return func(**args)
else:
return func(*args)
return wrapper
@unpack_args
def func(x, y):
return x + y
alors vous pouvez le mapper avec des arguments zippés
np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()
bien sûr, vous pouvez toujours utiliser Pool.starmap
en Python 3 (>=3.3) comme mentionné dans d'autres réponses.
une autre alternative simple est d'envelopper vos paramètres de fonction dans un tuple et ensuite envelopper les paramètres qui doivent être passés dans tuples ainsi. Ce n'est peut-être pas idéal lorsqu'il s'agit de données volumineuses. Je crois que ça ferait des copies pour chaque tuple.
from multiprocessing import Pool
def f((a,b,c,d)):
print a,b,c,d
return a + b + c +d
if __name__ == '__main__':
p = Pool(10)
data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
print(p.map(f, data))
p.close()
p.join()
donne la sortie dans un ordre aléatoire:
0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
une meilleure solution pour python2:
from multiprocessing import Pool
def func((i, (a, b))):
print i, a, b
return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])
2 3 4
1 2 3
0 1 2
[]:
[3, 5, 7]
une autre façon est de passer une liste de listes à une routine à un seul argument:
import os
from multiprocessing import Pool
def task(args):
print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]
pool = Pool()
pool.map(task, [
[1,2],
[3,4],
[5,6],
[7,8]
])
on peut que construire une liste de listes d'arguments avec sa méthode préférée.
de python 3.4.4, vous pouvez utiliser le multiprocessing.get_context () pour obtenir un objet de contexte pour utiliser plusieurs méthodes de démarrage:
import multiprocessing as mp
def foo(q, h, w):
q.put(h + ' ' + w)
print(h + ' ' + w)
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,'hello', 'world'))
p.start()
print(q.get())
p.join()
ou vous remplacez simplement
pool.map(harvester(text,case),case, 1)
par:
pool.apply_async(harvester(text,case),case, 1)
# "Comment prendre de multiples arguments".
def f1(args):
a, b, c = args[0] , args[1] , args[2]
return a+b+c
if __name__ == "__main__":
import multiprocessing
pool = multiprocessing.Pool(4)
result1 = pool.map(f1, [ [1,2,3] ])
print(result1)
dans la documentation officielle indique qu'il ne supporte qu'un seul argument itérable. J'aime utiliser apply_async dans de tels cas. Dans votre cas, je ferais:
from multiprocessing import Process, Pool, Manager
text = "test"
def harvester(text, case, q = None):
X = case[0]
res = text+ str(X)
if q:
q.put(res)
return res
def block_until(q, results_queue, until_counter=0):
i = 0
while i < until_counter:
results_queue.put(q.get())
i+=1
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=6)
case = RAW_DATASET
m = Manager()
q = m.Queue()
results_queue = m.Queue() # when it completes results will reside in this queue
blocking_process = Process(block_until, (q, results_queue, len(case)))
blocking_process.start()
for c in case:
try:
res = pool.apply_async(harvester, (text, case, q = None))
res.get(timeout=0.1)
except:
pass
blocking_process.join()
pour python2, vous pouvez utiliser ce truc
def fun(a,b):
return a+b
pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))