Comment utiliser threading en Python?

j'essaie de comprendre le threading en Python. J'ai regardé la documentation et les exemples, mais franchement, beaucoup d'exemples sont trop sophistiqués et j'ai du mal à les comprendre.

comment montrez-vous clairement les tâches étant divisé pour multi-threading?

971
demandé sur Martin Tournoij 2010-05-17 08:24:00

17 réponses

depuis que cette question a été posée en 2010, il y a eu une réelle simplification dans la façon de faire simple multithreading avec python avec carte et pool .

le code ci-dessous provient d'un article/blog post que vous devez absolument vérifier (aucune affiliation) - parallélisme en une ligne: Un meilleur modèle pour les tâches quotidiennes . Je vais résumer ci - dessous-il se termine par être juste quelques lignes de code:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

qui est la version multithread de:

results = []
for item in my_array:
    results.append(my_function(item))

Description

Map est une petite fonction cool, et la clé pour facilement injecter du parallélisme dans votre code Python. Pour ceux qui ne sont pas familiers, la carte est quelque chose soulevé de fonctionnel des langues comme le Lisp. C'est une fonction qui associe à une autre fonction au cours d'une séquence.

Carte gère l'itération sur la séquence pour nous, applique la fonction, et stocke tous les résultats dans une liste pratique à la fin.

enter image description here


mise en œuvre

versions parallèles de la carte fonction sont fournis par deux bibliothèques:multiprocessing, et aussi son peu connu, mais tout aussi fantastique enfant pas:multiprocessing.factice.

multiprocessing.dummy est exactement le même que le module multiprocessing, mais utilise des threads à la place ( une distinction importante - utiliser des processus multiples pour les tâches à forte intensité de CPU; threads pour (et pendant) IO ):

multiprocessing.dummy reproduit L'API de multiprocessing mais n'est qu'une enveloppe autour du module de filetage.

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

Et le calendrier résultats:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

passer plusieurs arguments (fonctionne comme ceci seulement en Python 3.3 et plus tard ):

pour passer plusieurs tableaux:

results = pool.starmap(function, zip(list_a, list_b))

ou pour passer une constante et un tableau:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

si vous utilisez une version antérieure de Python, vous pouvez passer plusieurs arguments via cette solution .

(merci à user136036 pour le commentaire utile)

1068
répondu philshem 2018-05-13 18:28:12

voici un exemple simple: vous devez essayer quelques URLs alternatives et retourner le contenu du premier à répondre.

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

c'est un cas où le filetage est utilisé comme une simple optimisation: chaque sous-fil est en attente d'une URL à résoudre et répondre, afin de mettre son contenu sur la file; chaque fil est un démon (ne va pas garder le processus si le fil principal se termine -- c'est plus commun que pas); le fil principal démarre tous les sous-fil, fait un get sur la file d'attente pour attendre jusqu'à ce que l'un d'eux ait fait un put , puis émet les résultats et se termine (ce qui élimine tous les sous-réseaux qui pourraient encore être en cours d'exécution, car ils sont des fils de démon).

L'utilisation correcte des threads en Python est invariablement liée aux opérations d'E/S (puisque CPython n'utilise pas plusieurs cœurs pour exécuter des tâches liées au CPU de toute façon, la seule raison pour threading n'est pas de bloquer le processus alors qu'il y a une attente pour certaines e/s). Les files d'attente sont presque invariablement la meilleure façon de cultiver le travail à des fils et/ou de recueillir les résultats du travail, soit dit en passant, et ils sont intrinsèquement threadsafe de sorte qu'ils vous évitent de vous soucier des serrures, des conditions, des événements, des sémaphores, et d'autres concepts de coordination/communication inter-fil.

675
répondu Alex Martelli 2015-02-28 08:47:57

NOTE : pour la parallélisation réelle en Python, vous devez utiliser le module multiprocessing pour bifurquer les processus multiples qui s'exécutent en parallèle (en raison du verrouillage global de l'interpréteur, les threads Python fournissent des interférences mais sont en fait exécutés en série, pas en parallèle, et ne sont utiles que lors des opérations d'entrelacement).

cependant, si vous êtes simplement à la recherche d'interférences (ou si vous effectuez des opérations d'E/S qui le module threading est le point de départ. Comme un exemple vraiment simple, considérons le problème de la sommation d'une large gamme en sommant des sous-séries en parallèle:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()  
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

notez que ce qui précède est un exemple très stupide, car il ne fait absolument pas d'E/S et sera exécuté en série bien qu'il soit entrelavé (avec la surcharge ajoutée de la commutation de contexte) dans CPython en raison de la interprète de verrouillage.

231
répondu Michael Aaron Safyan 2014-06-09 19:40:20

comme d'autres mentionnés, CPython ne peut utiliser des threads que pour I\O waits due to gil. Si vous souhaitez bénéficier de plusieurs cœurs pour les tâches liées au CPU, utilisez multiprocessing :

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
90
répondu Kai 2013-06-11 14:04:09

juste une note, la file d'attente n'est pas nécessaire pour le threading.

C'est l'exemple le plus simple que je puisse imaginer qui montre 10 processus fonctionnant simultanément.

import threading
from random import randint
from time import sleep


def print_number(number):
    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):
    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"
87
répondu Douglas Adams 2015-03-05 20:11:54

la réponse D'Alex Martelli m'a aidé, cependant voici une version modifiée que je pensais plus utile (au moins pour moi).

import Queue
import threading
import urllib2

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url= queue.get(False)
            data = urllib2.urlopen(url).read()
            print len(data)

        except Queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()
40
répondu JimJty 2013-10-01 15:50:44

j'ai trouvé cela très utile: créer autant de threads que de noyaux et les laisser exécuter un (grand) nombre de tâches (dans ce cas, appeler un programme shell):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done
19
répondu dolphin 2014-06-07 11:18:06

pour moi, l'exemple parfait pour fileter est la surveillance D'événements asynchrones. Regardez ce code.

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

vous pouvez jouer avec ce code en ouvrant une session IPython et en faisant quelque chose comme:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Attendez quelques minutes

>>>a[0] = 2
Mon = 2
16
répondu dvreed77 2013-04-14 04:18:42

ayant une fonction, f , le filez comme ceci:

import threading
threading.Thread(target=f).start()

pour passer des arguments à f

threading.Thread(target=f, args=(a,b,c)).start()
16
répondu starfry 2017-03-16 16:07:46

Python 3 a la possibilité de lancer des tâches parallèles . Cela rend notre travail plus facile.

il a pour thread pooling et Process pooling .

La suite donne un aperçu:

ThreadPoolExecutor Exemple

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
15
répondu Jeril 2017-07-20 11:17:23

utilisant le nouveau concurrent.les contrats à terme module

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

l'approche de l'exécuteur peut sembler familière à tous ceux qui se sont déjà sali les mains avec Java auparavant.

aussi sur une note de côté: pour garder l'univers sain, n'oubliez pas de fermer vos piscines/exécuteurs si vous n'utilisez pas with contexte (qui est si génial qu'il le fait pour vous)

13
répondu Shubham Chaudhary 2016-10-29 22:03:21

la plupart des documentations et des tutoriels utilisent les modules Threading et Queue de Python.

peut-être considérer le module concurrent.futures.ThreadPoolExecutor de python 3. Combiné avec la clause with et la compréhension de liste, il pourrait être un vrai charme.

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of urls to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads 
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results 
        rs = f.result()
9
répondu Yibo 2018-04-08 01:19:16

Voici l'exemple très simple de L'importation CSV en utilisant le filetage. [Bibliothèque de l'inclusion peut varier en fonction de la fin ]

Fonctions D'Aide:

from threading import Thread
from project import app 
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                #DB operation/query

Pilote De La Fonction:

import_handler(csv_file_name) 
8
répondu Chirag Vora 2018-05-09 05:31:49

multi threading avec exemple simple qui sera utile. Vous pouvez l'exécuter et comprendre facilement comment fonctionne multi thread en python. J'ai utilisé la serrure pour empêcher d'accéder à d'autres thread jusqu'à ce que les threads précédents aient terminé leur travail. Par l'utilisation de

tLock = threading.BoundedSemaphore (valeur=4)

cette ligne de code, vous pouvez autoriser des nombres de processus à la fois et garder la main sur le reste du thread qui sera exécuté plus tard ou après avoir terminé les processus précédents.

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()
6
répondu cSharma 2017-06-13 12:15:19

j'ai vu beaucoup d'exemples ici où aucun travail réel n'était effectué + ils étaient principalement reliés au CPU. Voici un exemple D'une tâche liée CPU qui calcule tous les nombres premiers entre 10 millions et 10,05 millions. J'ai utilisé les 4 Méthodes ici

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    #Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        #Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        #Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            #If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        #If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    #I am merely printing the length of the array containing all the primes but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have 4 workers
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so lets split the min and max values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        #Start the thrread with the min and max split up to compute
        #Parallel computation will not work here due to GIL since this is a CPU bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    #Dont forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the min, max interval similar to the threading method above but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method but use thread pool executor this time
    This method is slightly faster than using pure threading as the pools manage threads more efficiently
    This method is still slow due to the GIL limitations since we are doing a CPU bound task
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method but use the process pool executor
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations
    RECOMMENDED METHOD FOR CPU BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)

main()

Voici les résultats sur ma machine Mac OSX 4 core

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds
4
répondu PirateApp 2018-04-19 14:11:26

aucune des solutions ci-dessus n'a réellement utilisé plusieurs cœurs sur mon serveur GNU/Linux (où je n'ai pas de droits d'administrateur). Ils n'ont couru que sur un seul noyau. J'ai utilisé l'interface de niveau inférieur os.fork pour générer plusieurs processus. C'est le code qui a fonctionné pour moi:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break
3
répondu David Nathan 2018-02-13 12:54:41
import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()
1
répondu Skiller Dz 2018-05-07 18:06:31