Python multitraitement de la piscine.carte pour les arguments multiples

dans la bibliothèque multiprocessing de Python, il y a une variante de pool.map qui supporte plusieurs arguments?

text = "test"
def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text,case),case, 1)
    pool.close()
    pool.join()
302
demandé sur ATOzTOA 2011-03-26 17:23:10

15 réponses

la réponse à cette question dépend de la version et de la situation. La réponse la plus générale pour les versions récentes de Python (depuis 3.3) a d'abord été décrite ci-dessous par J. F. Sebastian . 1 il utilise la méthode Pool.starmap , qui accepte une séquence de tuples d'arguments. Il déballe alors automatiquement les arguments de chaque tuple et les passe à la fonction donnée:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

pour les versions de Python, vous aurez besoin d'écrire une fonction d'aide pour déballer les arguments explicitement. Si vous voulez utiliser with , vous devrez aussi écrire un wrapper pour transformer Pool en Gestionnaire de contexte. (Merci à muon pour l'avoir souligné.)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

dans des cas plus simples, avec un second argument fixe, vous pouvez aussi utiliser partial , mais seulement en python 2.7+.

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. Beaucoup de ce a été inspiré par sa réponse, qui aurait probablement dû être acceptée à la place. Mais puisque celui-ci est coincé au sommet, il a semblé préférable de l'améliorer pour les futurs lecteurs.

179
répondu senderle 2017-10-10 16:11:15

je pense que le ci-dessous sera mieux

def multi_run_wrapper(args):
   return add(*args)
def add(x,y):
    return x+y
if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

sortie

[3, 5, 7]
107
répondu imotai 2014-01-15 06:01:53

utilisant Python 3.3+ avec pool.starmap():

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

résultat:

1 --- 4
2 --- 5
3 --- 6

vous pouvez également zip () plus d'arguments si vous aimez: zip(a,b,c,d,e)

dans le cas où vous voulez avoir une valeur constante passée comme argument, vous devez utiliser import itertools et ensuite zip(itertools.repeat(constant), a) par exemple.

39
répondu user136036 2015-03-12 17:09:47

ayant appris les itertools dans J. F. Sebastian réponse j'ai décidé d'aller plus loin et d'écrire un paquet parmap qui s'occupe de la parallélisation, offrant des fonctions map et starmap sur python-2.7 et Python-3.2 (et plus tard aussi) qui peut prendre n'importe quel nombre d'arguments de position.

Installation

pip install parmap

Comment paralléliser:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

j'ai téléchargé parmap sur PyPI et sur un dépôt github .

à titre d'exemple, on peut répondre à la question comme suit:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)
22
répondu zeehio 2017-05-23 12:26:29

il y a une bifurcation de multiprocessing appelée pathos ( note: utilisez la version sur github ) qui n'a pas besoin de starmap -- les fonctions de la carte reflètent l'API pour la carte de python, donc la carte peut prendre plusieurs arguments. Avec pathos , vous pouvez aussi généralement faire du multiprocessing dans l'interpréteur, au lieu d'être bloqué dans le bloc __main__ . Pathos est prévu pour une sortie, après une légère mise à jour -- principalement la conversion en python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) 
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool    
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas 
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]
8
répondu Mike McKerns 2014-01-20 20:37:27

Vous pouvez utiliser les deux fonctions suivantes, afin d'éviter d'écrire un wrapper pour chaque nouvelle fonction:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

utilisez la fonction function avec les listes d'arguments arg_0 , arg_1 et arg_2 comme suit:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()
7
répondu Alfred M. 2014-06-27 07:42:41

Une meilleure façon est d'utiliser décorateur au lieu d'écrire fonction wrapper à la main. Surtout quand vous avez beaucoup de fonctions à mapper, decorator va sauver votre temps en évitant d'écrire wrapper pour chaque fonction. Habituellement une fonction décorée n'est pas picklable, cependant nous pouvons utiliser functools pour se déplacer. Plus de disscusions peuvent être trouvées ici .

Voici l'exemple

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

alors vous pouvez le mapper avec des arguments zippés

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

bien sûr, vous pouvez toujours utiliser Pool.starmap en Python 3 (>=3.3) comme mentionné dans d'autres réponses.

6
répondu Syrtis Major 2016-05-29 01:24:15

une autre alternative simple est d'envelopper vos paramètres de fonction dans un tuple et ensuite envelopper les paramètres qui doivent être passés dans tuples ainsi. Ce n'est peut-être pas idéal lorsqu'il s'agit de données volumineuses. Je crois que ça ferait des copies pour chaque tuple.

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

donne la sortie dans un ordre aléatoire:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]
5
répondu Alex Klibisz 2016-11-21 07:16:48

une meilleure solution pour python2:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

2 3 4

1 2 3

0 1 2

[]:

[3, 5, 7]

4
répondu xmduhan 2017-05-23 10:11:56

une autre façon est de passer une liste de listes à une routine à un seul argument:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

on peut que construire une liste de listes d'arguments avec sa méthode préférée.

3
répondu Adobe 2014-03-13 21:55:38

de python 3.4.4, vous pouvez utiliser le multiprocessing.get_context () pour obtenir un objet de contexte pour utiliser plusieurs méthodes de démarrage:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

ou vous remplacez simplement

pool.map(harvester(text,case),case, 1)

par:

pool.apply_async(harvester(text,case),case, 1)
2
répondu Tung Nguyen 2016-05-27 11:05:41

# "Comment prendre de multiples arguments".

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)
2
répondu Dane Lee 2017-12-25 11:44:06

dans la documentation officielle indique qu'il ne supporte qu'un seul argument itérable. J'aime utiliser apply_async dans de tels cas. Dans votre cas, je ferais:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()
0
répondu L M Rojas Aguilera 2017-04-02 06:34:05

pour python2, vous pouvez utiliser ce truc

def fun(a,b):
    return a+b

pool = multiprocessing.Pool(processes=6)
b=233
pool.map(lambda x:fun(x,b),range(1000))
0
répondu Hz Shang 2018-05-18 04:06:59