Comment faire pour multi-thread une opération dans une boucle en Python

dites que j'ai une très grande liste et que j'effectue une opération comme ça:

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

mon numéro est double:

  • il y a beaucoup d'articles
  • de l'api.my_operation prend toujours un "retour aux 151970920"

j'aimerais utiliser le multi-threading pour lancer un tas d'api.my_operations à la fois donc je peux traiter peut-être 5 ou 10 ou même 100 articles à la fois.

si my_operation () retourne une exception (parce que peut - être j'ai déjà traité cet élément) - C'est OK. Il ne va pas casser quoi que ce soit. La boucle peut continuer à l'élément suivant.

Note : C'est pour Python 2.7.3

40
demandé sur doremi 2013-02-28 23:17:13

3 réponses

tout d'abord, en Python, Si votre code est lié au CPU, le multithreading ne vous aidera pas, car un seul thread peut maintenir le verrouillage de L'interpréteur Global, et donc exécuter du code Python, à la fois. Donc, vous devez utiliser des processus, pas des threads.

ce n'est pas vrai si votre opération" prend une éternité pour revenir " parce qu'elle est liée à IO-c'est-à-dire qu'elle attend sur le réseau ou des copies de disque ou autres. Je reviendrai plus tard.


Ensuite, la façon de traiter 5 ou 10 ou 100 articles à la fois est de créer un bassin de 5 ou 10 ou 100 travailleurs, et mettre les articles dans une file d'attente que les travailleurs servent. Heureusement, les bibliothèques multiprocessing et concurrent.futures de stdlib couvrent la plupart des détails pour vous.

le premier est plus puissant et flexible pour la programmation traditionnelle; le second est plus simple si vous avez besoin de composer le futur-attente; pour trivial les affaires, peu importe ce que vous choisissez. (Dans ce cas , l'implémentation la plus évidente avec chaque prend 3 lignes avec futures , 4 lignes avec multiprocessing .)

si vous utilisez 2.6-2.7 ou 3.0-3.1, futures n'est pas intégré, mais vous pouvez l'installer à partir de PyPI ( pip install futures ).


enfin, il est généralement beaucoup plus simple de paralléliser les choses si vous pouvez tourner la boucle entière itération dans un appel de fonction (quelque chose que vous pourriez, par exemple, passer à map ), alors faisons d'abord cela:

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

Mettre tous ensemble:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

si vous avez beaucoup d'emplois relativement petits, les frais généraux de la multi-transformation pourrait inonder les gains. La façon de résoudre cela est de grouper le travail dans de plus grands emplois. Par exemple (en utilisant grouper du itertools recipes , que vous pouvez copier et coller dans votre code, ou obtenir à partir du more-itertools projet sur PyPI):

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

enfin, et si votre code est io bound? Alors les threads sont tout aussi bons que les processus, et avec moins de dépassement (et moins de limitations, mais ces limitations ne vous affecteront généralement pas dans des cas comme celui-ci). Parfois que "moins au-dessus" est suffisant pour signifier que vous n'avez pas besoin de batching avec des fils, mais vous faire avec le processus, qui est une belle victoire.

alors, comment utilisez-vous les threads au lieu des processus? Il suffit de remplacer ProcessPoolExecutor par ThreadPoolExecutor .

si vous n'êtes pas sûr si votre code est relié par CPU ou IO, essayez les deux.


puis-je faire cela pour plusieurs fonctions dans mon script python? Par exemple, si j'avais un autre For loop ailleurs dans le code que je voulais parallélisation. Est-il possible de faire deux multithread fonctions dans le même script?

Oui. En fait, il existe deux façons différentes de le faire.

tout d'abord, vous pouvez partager le même exécuteur (thread ou process) et l'utiliser à partir de plusieurs endroits sans problème. Le but des tâches et des futurs est qu'ils sont autonomes; vous ne vous souciez pas où ils courent, juste que vous les mettez en file d'attente et éventuellement obtenir la réponse de retour.

Alternativement, vous pouvez avoir deux exécuteurs dans le même programme sans problème. Ceci a un coût de performance-si vous utilisez les deux exécuteurs en même temps, vous finirez par essayer d'exécuter (par exemple) 16 threads occupés sur 8 noyaux, ce qui signifie qu'il va y avoir une commutation de contexte. Mais parfois cela vaut la peine de le faire parce que, disons, les deux exécuteurs sont rarement occupés en même temps, et cela rend votre code beaucoup plus simple. Ou peut-être qu'un exécuteur exécute de très grandes tâches qui peuvent prendre tandis que pour terminer, et l'autre exécute des tâches très petites qui ont besoin de terminer aussi rapidement que possible, parce que la réactivité est plus importante que le débit pour une partie de votre programme.

Si vous ne savez pas qui est approprié pour votre programme, habituellement c'est la première.

73
répondu abarnert 2014-10-16 17:45:07

Edit 2018-02-06 : révision basée sur ce commentaire

Edit : oublié de mentionner que cela fonctionne sur Python 2.7.x

il y a multiprocessing.

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

maintenant si vous identifiez vraiment que votre processus est lié au CPU comme @abarnert l'a mentionné, changez ThreadPool pour le processus mise en œuvre du pool (commenté dans ThreadPool import). Vous pouvez trouver plus de détails ici: http://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers

19
répondu woozyking 2018-02-06 21:29:01

vous pouvez diviser le traitement en un nombre spécifié de threads en utilisant une approche comme celle-ci:

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)
8
répondu Ryan Haining 2013-02-28 19:33:22