comment obtenir la valeur de retour d'un thread en python?
Comment obtenir la valeur 'foo'
qui est retournée à partir du fil?
from threading import Thread
def foo(bar):
print 'hello {}'.format(bar)
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret
la seule façon évidente de le faire, indiquée ci-dessus, renvoie None
.
20 réponses
FWIW, le module multiprocessing
a une interface agréable pour cela en utilisant la classe Pool
. Et si vous voulez vous en tenir à des threads plutôt qu'à des processus, vous pouvez simplement utiliser la classe multiprocessing.pool.ThreadPool
en remplacement direct.
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo
# do some other stuff in the main process
return_val = async_result.get() # get the return value from your function.
une façon que j'ai vu est de passer un objet mutable, comme une liste ou un dictionnaire, au constructeur du thread, avec un index ou un autre identificateur d'une sorte. Le fil peut alors stocker ses résultats dans son logement dédié à cet objet. Par exemple:
def foo(bar, result, index):
print 'hello {0}'.format(bar)
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# do some other stuff
for i in range(len(threads)):
threads[i].join()
print " ".join(results) # what sound does a metasyntactic locomotive make?
si vous voulez vraiment que join()
renvoie la valeur de retour de la fonction appelée, Vous pouvez le faire avec une sous-classe Thread
comme suit:
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print twrv.join() # prints foo
qui devient un peu nerveux en raison d'une certaine mutilation du nom, et il accède à des structures de données" privées "qui sont spécifiques à la mise en œuvre de Thread
... mais il fonctionne.
pour python3
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
print(type(self._target))
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
la réponse de Jake est bonne, mais si vous ne voulez pas utiliser un threadpool (vous ne savez pas combien de threads vous aurez besoin, mais créez-les au besoin) alors une bonne façon de transmettre l'information entre les threads est la file d'attente intégrée .Queue classe,car il offre la sécurité de fil.
j'ai créé le décorateur suivant pour le faire agir d'une manière similaire à la threadpool:
def threaded(f, daemon=False):
import Queue
def wrapped_f(q, *args, **kwargs):
'''this function calls the decorated function and puts the
result in a queue'''
ret = f(*args, **kwargs)
q.put(ret)
def wrap(*args, **kwargs):
'''this is the function returned from the decorator. It fires off
wrapped_f in a new thread and returns the thread object with
the result queue attached'''
q = Queue.Queue()
t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
t.daemon = daemon
t.start()
t.result_queue = q
return t
return wrap
alors vous l'utilisez comme:
@threaded
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Thread object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result_queue.get()
print result
la fonction décorée crée un nouveau thread chaque fois qu'elle est appelée et renvoie un objet Thread qui contient la file qui recevra le résultat.
mise à JOUR
Cela fait longtemps que j'ai posté cette réponse, mais elle reçoit toujours des vues, donc j'ai pensé la mettre à jour pour refléter la façon dont je fais ceci dans les nouvelles versions de Python:
Python 3.2 ajouté dans le module concurrent.futures
qui fournit une interface de haut niveau pour les tâches parallèles. Il fournit ThreadPoolExecutor
et ProcessPoolExecutor
, de sorte que vous pouvez utiliser un thread ou un pool de processus avec la même api.
l'un des avantages de cette api est que soumettre une tâche à un Executor
retourne un Future
objet, qui complète avec la valeur de retour du callable que vous soumettez.
ce qui rend la fixation d'un queue
objet inutile, ce qui simplifie un peu le décorateur:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)
return wrap
cela utilisera un exécuteur par défaut module threadpool executor si un exécuteur n'est pas passé.
l'usage est très similaire à avant:
@threadpool
def long_task(x):
import time
x = x + 5
time.sleep(5)
return x
# does not block, returns Future object
y = long_task(10)
print y
# this blocks, waiting for the result
result = y.result()
print result
si vous utilisez Python 3.4+, une caractéristique vraiment agréable de l'utilisation de cette méthode (et des objets futurs en général) est que le futur retourné peut être enveloppé pour le tourner dans un asyncio.Future
avec asyncio.wrap_future
. Cela lui permet de fonctionner facilement avec les coroutines:
result = await asyncio.wrap_future(long_task(10))
si vous n'avez pas besoin d'accéder à l'objet sous-jacent concurrent.Future
, vous pouvez inclure l'enveloppe dans le décorateur:
_DEFAULT_POOL = ThreadPoolExecutor()
def threadpool(f, executor=None):
@wraps(f)
def wrap(*args, **kwargs):
return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))
return wrap
ensuite, chaque fois que vous avez besoin de pousser CPU intensive ou le code de blocage du fil de boucle d'événement, vous pouvez le mettre dans une fonction décorée:
@threadpool
def some_long_calculation():
...
# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
une autre solution qui ne nécessite pas de changer votre code existant:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result
il peut aussi être facilement ajusté à un environnement multi-filetés:
import Queue
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar)
return 'foo'
que = Queue.Queue()
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Join all the threads
for t in threads_list:
t.join()
# Check thread's return value
while not que.empty():
result = que.get()
print result
Parris / kindall répondre join
/ return
réponse porté vers Python 3:
from threading import Thread
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args, **self._kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # prints foo
notez que la classe Thread
est implémentée différemment en Python 3.
j'ai volé la réponse de kindall et je l'ai nettoyée un petit peu.
la partie clé est d'ajouter * args et * * kwargs à Jointer () afin de gérer le délai d'attente
class threadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(threadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(threadWithReturn, self).join(*args, **kwargs)
return self._return
UPDATED ANSWER BELOW
C'est ma réponse la plus populaire, j'ai donc décidé de mettre à jour avec du code qui s'exécutera sur py2 et py3.
en outre, je vois beaucoup de réponses à cette question que montrez un manque de compréhension en ce qui concerne le fil.rejoindre.)( Certains ne parviennent pas à gérer le timeout
arg. Mais il y a aussi un corner-case que vous devriez être conscient concernant les instances quand vous avez (1) une fonction cible qui peut retourner None
et (2) vous passez aussi l'arg timeout
à join(). Veuillez consulter la section "TEST 4" pour comprendre ce cas particulier.
ThreadWithReturn class qui fonctionne avec py2 et py3:
import sys
from threading import Thread
from builtins import super # https://stackoverflow.com/a/30159479
if sys.version_info >= (3, 0):
_thread_target_key = '_target'
_thread_args_key = '_args'
_thread_kwargs_key = '_kwargs'
else:
_thread_target_key = '_Thread__target'
_thread_args_key = '_Thread__args'
_thread_kwargs_key = '_Thread__kwargs'
class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return = None
def run(self):
target = getattr(self, _thread_target_key)
if not target is None:
self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return
quelques échantillons les essais sont indiqués ci-dessous:
import time, random
# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
if not seconds is None:
time.sleep(seconds)
return arg
# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')
# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)
# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
pouvez-vous identifier la boîte de coin que nous pourrions rencontrer avec le TEST 4?
le problème est que nous nous attendons à ce que giveMe () ne renvoie aucun (voir TEST 2), mais nous nous attendons aussi à ce que join () ne renvoie aucun si cela se produit.
returned is None
signifie soit:
(1) c'est ce que giveMe() a retourné, ou
(2) join() timed out
ce exemple est trivial puisque nous savons que giveMe() retournera toujours Aucun. Mais dans le monde réel (où la cible peut légitimement ne rien rendre ou quelque chose d'autre), nous voudrions vérifier explicitement ce qui s'est passé.
ci-dessous est la façon de traiter ce coin-affaire:
# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
if my_thread.isAlive():
# returned is None because join() timed out
# this also means that giveMe() is still running in the background
pass
# handle this based on your app's logic
else:
# join() is finished, and so is giveMe()
# BUT we could also be in a race condition, so we need to update returned, just in case
returned = my_thread.join()
ma solution au problème est d'envelopper la fonction et le fil dans une classe. Ne nécessite pas l'utilisation de pools,de files d'attente ou de passage de variables de type C. Il est également non bloquant. Tu vérifies le statut à la place. Voir l'exemple d'utilisation à la fin du code.
import threading
class ThreadWorker():
'''
The basic idea is given a function create an object.
The object can then run the function in a thread.
It provides a wrapper to start it,check its status,and get data out the function.
'''
def __init__(self,func):
self.thread = None
self.data = None
self.func = self.save_data(func)
def save_data(self,func):
'''modify function to save its returned data'''
def new_func(*args, **kwargs):
self.data=func(*args, **kwargs)
return new_func
def start(self,params):
self.data = None
if self.thread is not None:
if self.thread.isAlive():
return 'running' #could raise exception here
#unless thread exists and is alive start or restart it
self.thread = threading.Thread(target=self.func,args=params)
self.thread.start()
return 'started'
def status(self):
if self.thread is None:
return 'not_started'
else:
if self.thread.isAlive():
return 'running'
else:
return 'finished'
def get_results(self):
if self.thread is None:
return 'not_started' #could return exception
else:
if self.thread.isAlive():
return 'running'
else:
return self.data
def add(x,y):
return x +y
add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
En Utilisant La File D'Attente :
import threading, queue
def calc_square(num, out_queue1):
l = []
for x in num:
l.append(x*x)
out_queue1.put(l)
arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
vous pouvez définir un mutable au-dessus de la portée de la fonction filetée, et ajouter le résultat à cela. (J'ai aussi modifié le code pour qu'il soit compatible python3)
returns = {}
def foo(bar):
print('hello {0}'.format(bar))
returns[bar] = 'foo'
from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)
retourne {'world!': 'foo'}
si vous utilisez la fonction entrée comme clé pour vos résultats dict, chaque entrée unique est garantie de donner une entrée dans les résultats""
vous pouvez utiliser le Pool comme un pool de processus ouvriers comme ci-dessous:
from multiprocessing import Pool
def f1(x, y):
return x*y
if __name__ == '__main__':
with Pool(processes=10) as pool:
result = pool.apply(f1, (2, 3))
print(result)
Prenant en compte @iman commentaire sur @JakeBiesinger réponse, j'ai recomposé pour avoir un divers nombre de threads:
from multiprocessing.pool import ThreadPool
def foo(bar, baz):
print 'hello {0}'.format(bar)
return 'foo' + baz
numOfThreads = 3
results = []
pool = ThreadPool(numOfThreads)
for i in range(0, numOfThreads):
results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)
# do some other stuff in the main process
# ...
# ...
results = [r.get() for r in results]
print results
pool.close()
pool.join()
Cheers,
Guy.
join
toujours retourner None
, je pense que vous devriez sous-classe Thread
pour manipuler les codes de retour et ainsi.
j'utilise ce wrapper, qui tourne confortablement n'importe quelle fonction pour exécuter dans un Thread
- en prenant soin de sa valeur de retour ou d'exception. Il n'ajoute pas Queue
au-dessus.
def threading_func(f):
"""Decorator for running a function in a thread and handling its return
value or exception"""
def start(*args, **kw):
def run():
try:
th.ret = f(*args, **kw)
except:
th.exc = sys.exc_info()
def get(timeout=None):
th.join(timeout)
if th.exc:
raise th.exc[0], th.exc[1], th.exc[2] # py2
##raise th.exc[1] #py3
return th.ret
th = threading.Thread(None, run)
th.exc = None
th.get = get
th.start()
return th
return start
Exemples D'Utilisation
def f(x):
return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))
@threading_func
def th_mul(a, b):
return a * b
th = th_mul("text", 2.5)
try:
print(th.get())
except TypeError:
print("exception thrown ok.")
Notes sur threading
module
valeur de retour confortable et la manipulation d'exception d'une fonction filetée est un besoin "pythonique" fréquent et devrait en effet déjà être offert par le module threading
- peut-être directement dans la classe standard Thread
. ThreadPool
a beaucoup trop de frais généraux pour les tâches simples - 3 la gestion des fils, beaucoup de bureaucratie. Malheureusement, la mise en page de Thread
a été copiée de Java à l'origine - ce que vous voyez par exemple à partir du 1st (!) paramètre constructeur group
.
comme mentionné, le pool de multiprocesseurs est beaucoup plus lent que le threading de base. Utiliser les files d'attente comme proposé dans certaines réponses voici une alternative très efficace. Je l'ai utilisé avec des dictionnaires afin de pouvoir exécuter beaucoup de petits fils et récupérer des réponses multiples en les combinant avec des dictionnaires:
#!/usr/bin/env python3
import threading
# use Queue for python2
import queue
import random
LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]
NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
def randoms(k, q):
result = dict()
result['letter'] = random.choice(LETTERS)
result['number'] = random.choice(NUMBERS)
q.put({k: result})
threads = list()
q = queue.Queue()
results = dict()
for name in ('alpha', 'oscar', 'yankee',):
threads.append( threading.Thread(target=randoms, args=(name, q)) )
threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
results.update(q.get())
print(results)
Définissez votre cible à
1) Prendre un argument q
2) remplacer toute mention return foo
par q.put(foo); return
ainsi une fonction
def func(a):
ans = a * a
return ans
deviendrait
def func(a, q):
ans = a * a
q.put(ans)
return
et alors vous procéderiez comme tel
from Queue import Queue
from threading import Thread
ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]
threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]
et vous pouvez utiliser la fonction décorateurs / enveloppeurs pour le faire de sorte que vous pouvez utiliser vos fonctions existantes comme target
sans les modifier, mais suivre ce schéma de base.
Une solution habituelle est d'envelopper votre fonction foo
avec un décorateur, comme
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
alors le code entier peut ressembler à cela
result = queue.Queue()
def task_wrapper(*args):
result.put(target(*args))
threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]
for t in threads:
t.start()
while(True):
if(len(threading.enumerate()) < max_num):
break
for t in threads:
t.join()
return result
Note
une question importante est que les valeurs de retour peuvent être unorderred .
(En fait, le return value
n'est pas nécessairement sauvegardé dans le queue
, puisque vous pouvez choisir arbitrairement thread-safe données structure)
pourquoi ne pas simplement utiliser la variable globale?
import threading
class myThread(threading.Thread):
def __init__(self, ind, lock):
threading.Thread.__init__(self)
self.ind = ind
self.lock = lock
def run(self):
global results
with self.lock:
results.append(self.ind)
results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(results)
façon très simple de faire cela pour des idiots comme moi:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self):
# in this class and function we will put our test target function
test()
t = AnyThread()
# having our test target function
def test():
# do something in this function:
result = 3 + 2
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run()
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 5
>>> 5
ici, c'est le module queue
. Nous créons l'instance queue.Queue()
et l'incluons dans notre fonction. Nous l'alimentons avec notre résultat que nous dépassons plus tard.
s'il vous Plaît voir un exemple de plus avec des arguments transmis à notre test de la fonction:
import queue
import threading
# creating queue instance
q = queue.Queue()
# creating threading class
class AnyThread():
def __init__ (self):
threading.Thread.__init__(self)
def run(self, a, b):
# in this class and function we will put our execution test function
test(a, b)
t = AnyThread()
# having our test target function
def test(a, b):
# do something in this function:
result = a + b
# and put result to a queue instance
q.put(result)
for i in range(3): #calling our threading fucntion 3 times (just for example)
t.run(3+i, 2+i)
output = q.get() # here we get output from queue instance
print(output)
>>> 5
>>> 7
>>> 9
L'idée de GuySoft est grande, mais je pense que l'objet ne doit pas nécessairement hériter du Thread et start () pourrait être retiré de l'interface:
from threading import Thread
import queue
class ThreadWithReturnValue(object):
def __init__(self, target=None, args=(), **kwargs):
self._que = queue.Queue()
self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
args=(self._que, args, kwargs), )
self._t.start()
def join(self):
self._t.join()
return self._que.get()
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
print(twrv.join()) # prints foo
Kindall la réponse de en Python3
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, *, daemon=None):
Thread.__init__(self, group, target, name, args, kwargs, daemon)
self._return = None
def run(self):
try:
if self._target:
self._return = self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
def join(self,timeout=None):
Thread.join(self,timeout)
return self._return