comment obtenir la valeur de retour d'un thread en python?

Comment obtenir la valeur 'foo' qui est retournée à partir du fil?

from threading import Thread

def foo(bar):
    print 'hello {}'.format(bar)
    return 'foo'

thread = Thread(target=foo, args=('world!',))
thread.start()
ret = thread.join()
print ret

la seule façon évidente de le faire, indiquée ci-dessus, renvoie None .

202
demandé sur wim 2011-08-01 07:20:30

20 réponses

FWIW, le module multiprocessing a une interface agréable pour cela en utilisant la classe Pool . Et si vous voulez vous en tenir à des threads plutôt qu'à des processus, vous pouvez simplement utiliser la classe multiprocessing.pool.ThreadPool en remplacement direct.

def foo(bar, baz):
  print 'hello {0}'.format(bar)
  return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo')) # tuple of args for foo

# do some other stuff in the main process

return_val = async_result.get()  # get the return value from your function.
200
répondu Jake Biesinger 2016-03-30 15:36:28

une façon que j'ai vu est de passer un objet mutable, comme une liste ou un dictionnaire, au constructeur du thread, avec un index ou un autre identificateur d'une sorte. Le fil peut alors stocker ses résultats dans son logement dédié à cet objet. Par exemple:

def foo(bar, result, index):
    print 'hello {0}'.format(bar)
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# do some other stuff

for i in range(len(threads)):
    threads[i].join()

print " ".join(results)  # what sound does a metasyntactic locomotive make?

si vous voulez vraiment que join() renvoie la valeur de retour de la fonction appelée, Vous pouvez le faire avec une sous-classe Thread comme suit:

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print twrv.join()   # prints foo

qui devient un peu nerveux en raison d'une certaine mutilation du nom, et il accède à des structures de données" privées "qui sont spécifiques à la mise en œuvre de Thread ... mais il fonctionne.

pour python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None
    def run(self):
        print(type(self._target))
        if self._target is not None:
            self._return = self._target(*self._args,
                                                **self._kwargs)
    def join(self, *args):
        Thread.join(self, *args)
        return self._return
140
répondu kindall 2018-08-22 16:11:51

la réponse de Jake est bonne, mais si vous ne voulez pas utiliser un threadpool (vous ne savez pas combien de threads vous aurez besoin, mais créez-les au besoin) alors une bonne façon de transmettre l'information entre les threads est la file d'attente intégrée .Queue classe,car il offre la sécurité de fil.

j'ai créé le décorateur suivant pour le faire agir d'une manière similaire à la threadpool:

def threaded(f, daemon=False):
    import Queue

    def wrapped_f(q, *args, **kwargs):
        '''this function calls the decorated function and puts the 
        result in a queue'''
        ret = f(*args, **kwargs)
        q.put(ret)

    def wrap(*args, **kwargs):
        '''this is the function returned from the decorator. It fires off
        wrapped_f in a new thread and returns the thread object with
        the result queue attached'''

        q = Queue.Queue()

        t = threading.Thread(target=wrapped_f, args=(q,)+args, kwargs=kwargs)
        t.daemon = daemon
        t.start()
        t.result_queue = q        
        return t

    return wrap

alors vous l'utilisez comme:

@threaded
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Thread object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result_queue.get()
print result

la fonction décorée crée un nouveau thread chaque fois qu'elle est appelée et renvoie un objet Thread qui contient la file qui recevra le résultat.

mise à JOUR

Cela fait longtemps que j'ai posté cette réponse, mais elle reçoit toujours des vues, donc j'ai pensé la mettre à jour pour refléter la façon dont je fais ceci dans les nouvelles versions de Python:

Python 3.2 ajouté dans le module concurrent.futures qui fournit une interface de haut niveau pour les tâches parallèles. Il fournit ThreadPoolExecutor et ProcessPoolExecutor , de sorte que vous pouvez utiliser un thread ou un pool de processus avec la même api.

l'un des avantages de cette api est que soumettre une tâche à un Executor retourne un Future objet, qui complète avec la valeur de retour du callable que vous soumettez.

ce qui rend la fixation d'un queue objet inutile, ce qui simplifie un peu le décorateur:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return (executor or _DEFAULT_POOL).submit(f, *args, **kwargs)

    return wrap

cela utilisera un exécuteur par défaut module threadpool executor si un exécuteur n'est pas passé.

l'usage est très similaire à avant:

@threadpool
def long_task(x):
    import time
    x = x + 5
    time.sleep(5)
    return x

# does not block, returns Future object
y = long_task(10)
print y

# this blocks, waiting for the result
result = y.result()
print result

si vous utilisez Python 3.4+, une caractéristique vraiment agréable de l'utilisation de cette méthode (et des objets futurs en général) est que le futur retourné peut être enveloppé pour le tourner dans un asyncio.Future avec asyncio.wrap_future . Cela lui permet de fonctionner facilement avec les coroutines:

result = await asyncio.wrap_future(long_task(10))

si vous n'avez pas besoin d'accéder à l'objet sous-jacent concurrent.Future , vous pouvez inclure l'enveloppe dans le décorateur:

_DEFAULT_POOL = ThreadPoolExecutor()

def threadpool(f, executor=None):
    @wraps(f)
    def wrap(*args, **kwargs):
        return asyncio.wrap_future((executor or _DEFAULT_POOL).submit(f, *args, **kwargs))

    return wrap

ensuite, chaque fois que vous avez besoin de pousser CPU intensive ou le code de blocage du fil de boucle d'événement, vous pouvez le mettre dans une fonction décorée:

@threadpool
def some_long_calculation():
    ...

# this will suspend while the function is executed on a threadpool
result = await some_long_calculation()
56
répondu bj0 2018-04-20 16:32:06

une autre solution qui ne nécessite pas de changer votre code existant:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result

il peut aussi être facilement ajusté à un environnement multi-filetés:

import Queue
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)
    return 'foo'

que = Queue.Queue()
threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Add more threads here
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Join all the threads
for t in threads_list:
    t.join()

# Check thread's return value
while not que.empty():
    result = que.get()
    print result
20
répondu Arik 2016-04-28 21:52:17

Parris / kindall répondre join / return réponse porté vers Python 3:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon=daemon)

        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args, **self._kwargs)

    def join(self):
        Thread.join(self)
        return self._return


twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # prints foo

notez que la classe Thread est implémentée différemment en Python 3.

13
répondu GuySoft 2017-05-23 12:18:22

j'ai volé la réponse de kindall et je l'ai nettoyée un petit peu.

la partie clé est d'ajouter * args et * * kwargs à Jointer () afin de gérer le délai d'attente

class threadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(threadWithReturn, self).__init__(*args, **kwargs)

        self._return = None

    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)

    def join(self, *args, **kwargs):
        super(threadWithReturn, self).join(*args, **kwargs)

        return self._return

UPDATED ANSWER BELOW

C'est ma réponse la plus populaire, j'ai donc décidé de mettre à jour avec du code qui s'exécutera sur py2 et py3.

en outre, je vois beaucoup de réponses à cette question que montrez un manque de compréhension en ce qui concerne le fil.rejoindre.)( Certains ne parviennent pas à gérer le timeout arg. Mais il y a aussi un corner-case que vous devriez être conscient concernant les instances quand vous avez (1) une fonction cible qui peut retourner None et (2) vous passez aussi l'arg timeout à join(). Veuillez consulter la section "TEST 4" pour comprendre ce cas particulier.

ThreadWithReturn class qui fonctionne avec py2 et py3:

import sys
from threading import Thread
from builtins import super    # https://stackoverflow.com/a/30159479

if sys.version_info >= (3, 0):
    _thread_target_key = '_target'
    _thread_args_key = '_args'
    _thread_kwargs_key = '_kwargs'
else:
    _thread_target_key = '_Thread__target'
    _thread_args_key = '_Thread__args'
    _thread_kwargs_key = '_Thread__kwargs'

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None

    def run(self):
        target = getattr(self, _thread_target_key)
        if not target is None:
            self._return = target(*getattr(self, _thread_args_key), **getattr(self, _thread_kwargs_key))

    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

quelques échantillons les essais sont indiqués ci-dessous:

import time, random

# TEST TARGET FUNCTION
def giveMe(arg, seconds=None):
    if not seconds is None:
        time.sleep(seconds)
    return arg

# TEST 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# TEST 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# TEST 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # because join() timed out before giveMe() finished

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

pouvez-vous identifier la boîte de coin que nous pourrions rencontrer avec le TEST 4?

le problème est que nous nous attendons à ce que giveMe () ne renvoie aucun (voir TEST 2), mais nous nous attendons aussi à ce que join () ne renvoie aucun si cela se produit.

returned is None signifie soit:

(1) c'est ce que giveMe() a retourné, ou

(2) join() timed out

ce exemple est trivial puisque nous savons que giveMe() retournera toujours Aucun. Mais dans le monde réel (où la cible peut légitimement ne rien rendre ou quelque chose d'autre), nous voudrions vérifier explicitement ce qui s'est passé.

ci-dessous est la façon de traiter ce coin-affaire:

# TEST 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None because join() timed out
    # this also means that giveMe() is still running in the background
    pass
    # handle this based on your app's logic
else:
    # join() is finished, and so is giveMe()
    # BUT we could also be in a race condition, so we need to update returned, just in case
    returned = my_thread.join()
12
répondu user2426679 2018-07-21 03:47:13

ma solution au problème est d'envelopper la fonction et le fil dans une classe. Ne nécessite pas l'utilisation de pools,de files d'attente ou de passage de variables de type C. Il est également non bloquant. Tu vérifies le statut à la place. Voir l'exemple d'utilisation à la fin du code.

import threading

class ThreadWorker():
    '''
    The basic idea is given a function create an object.
    The object can then run the function in a thread.
    It provides a wrapper to start it,check its status,and get data out the function.
    '''
    def __init__(self,func):
        self.thread = None
        self.data = None
        self.func = self.save_data(func)

    def save_data(self,func):
        '''modify function to save its returned data'''
        def new_func(*args, **kwargs):
            self.data=func(*args, **kwargs)

        return new_func

    def start(self,params):
        self.data = None
        if self.thread is not None:
            if self.thread.isAlive():
                return 'running' #could raise exception here

        #unless thread exists and is alive start or restart it
        self.thread = threading.Thread(target=self.func,args=params)
        self.thread.start()
        return 'started'

    def status(self):
        if self.thread is None:
            return 'not_started'
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return 'finished'

    def get_results(self):
        if self.thread is None:
            return 'not_started' #could return exception
        else:
            if self.thread.isAlive():
                return 'running'
            else:
                return self.data

def add(x,y):
    return x +y

add_worker = ThreadWorker(add)
print add_worker.start((1,2,))
print add_worker.status()
print add_worker.get_results()
6
répondu Peter Lonjers 2013-04-30 22:35:03

En Utilisant La File D'Attente :

import threading, queue

def calc_square(num, out_queue1):
  l = []
  for x in num:
    l.append(x*x)
  out_queue1.put(l)


arr = [1,2,3,4,5,6,7,8,9,10]
out_queue1=queue.Queue()
t1=threading.Thread(target=calc_square, args=(arr,out_queue1))
t1.start()
t1.join()
print (out_queue1.get())
6
répondu user341143 2017-11-25 20:45:42

vous pouvez définir un mutable au-dessus de la portée de la fonction filetée, et ajouter le résultat à cela. (J'ai aussi modifié le code pour qu'il soit compatible python3)

returns = {}
def foo(bar):
    print('hello {0}'.format(bar))
    returns[bar] = 'foo'

from threading import Thread
t = Thread(target=foo, args=('world!',))
t.start()
t.join()
print(returns)

retourne {'world!': 'foo'}

si vous utilisez la fonction entrée comme clé pour vos résultats dict, chaque entrée unique est garantie de donner une entrée dans les résultats""

2
répondu Thijs D 2016-03-19 05:05:28

vous pouvez utiliser le Pool comme un pool de processus ouvriers comme ci-dessous:

from multiprocessing import Pool


def f1(x, y):
    return x*y


if __name__ == '__main__':
    with Pool(processes=10) as pool:
        result = pool.apply(f1, (2, 3))
        print(result)
2
répondu Tung Nguyen 2016-05-27 03:39:18

Prenant en compte @iman commentaire sur @JakeBiesinger réponse, j'ai recomposé pour avoir un divers nombre de threads:

from multiprocessing.pool import ThreadPool

def foo(bar, baz):
    print 'hello {0}'.format(bar)
    return 'foo' + baz

numOfThreads = 3 
results = []

pool = ThreadPool(numOfThreads)

for i in range(0, numOfThreads):
    results.append(pool.apply_async(foo, ('world', 'foo'))) # tuple of args for foo)

# do some other stuff in the main process
# ...
# ...

results = [r.get() for r in results]
print results

pool.close()
pool.join()

Cheers,

Guy.

2
répondu Guy Avraham 2017-11-18 08:53:07

join toujours retourner None , je pense que vous devriez sous-classe Thread pour manipuler les codes de retour et ainsi.

1
répondu BrainStorm 2011-08-01 03:26:22

j'utilise ce wrapper, qui tourne confortablement n'importe quelle fonction pour exécuter dans un Thread - en prenant soin de sa valeur de retour ou d'exception. Il n'ajoute pas Queue au-dessus.

def threading_func(f):
    """Decorator for running a function in a thread and handling its return
    value or exception"""
    def start(*args, **kw):
        def run():
            try:
                th.ret = f(*args, **kw)
            except:
                th.exc = sys.exc_info()
        def get(timeout=None):
            th.join(timeout)
            if th.exc:
                raise th.exc[0], th.exc[1], th.exc[2] # py2
                ##raise th.exc[1] #py3                
            return th.ret
        th = threading.Thread(None, run)
        th.exc = None
        th.get = get
        th.start()
        return th
    return start

Exemples D'Utilisation

def f(x):
    return 2.5 * x
th = threading_func(f)(4)
print("still running?:", th.is_alive())
print("result:", th.get(timeout=1.0))

@threading_func
def th_mul(a, b):
    return a * b
th = th_mul("text", 2.5)

try:
    print(th.get())
except TypeError:
    print("exception thrown ok.")

Notes sur threading module

valeur de retour confortable et la manipulation d'exception d'une fonction filetée est un besoin "pythonique" fréquent et devrait en effet déjà être offert par le module threading - peut-être directement dans la classe standard Thread . ThreadPool a beaucoup trop de frais généraux pour les tâches simples - 3 la gestion des fils, beaucoup de bureaucratie. Malheureusement, la mise en page de Thread a été copiée de Java à l'origine - ce que vous voyez par exemple à partir du 1st (!) paramètre constructeur group .

1
répondu kxr 2016-08-17 21:59:10

comme mentionné, le pool de multiprocesseurs est beaucoup plus lent que le threading de base. Utiliser les files d'attente comme proposé dans certaines réponses voici une alternative très efficace. Je l'ai utilisé avec des dictionnaires afin de pouvoir exécuter beaucoup de petits fils et récupérer des réponses multiples en les combinant avec des dictionnaires:

#!/usr/bin/env python3

import threading
# use Queue for python2
import queue
import random

LETTERS = 'abcdefghijklmnopqrstuvwxyz'
LETTERS = [ x for x in LETTERS ]

NUMBERS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

def randoms(k, q):
    result = dict()
    result['letter'] = random.choice(LETTERS)
    result['number'] = random.choice(NUMBERS)
    q.put({k: result})

threads = list()
q = queue.Queue()
results = dict()

for name in ('alpha', 'oscar', 'yankee',):
    threads.append( threading.Thread(target=randoms, args=(name, q)) )
    threads[-1].start()
_ = [ t.join() for t in threads ]
while not q.empty():
    results.update(q.get())

print(results)
1
répondu Yves Dorfsman 2018-01-08 15:08:22

Définissez votre cible à

1) Prendre un argument q

2) remplacer toute mention return foo par q.put(foo); return

ainsi une fonction

def func(a):
    ans = a * a
    return ans

deviendrait

def func(a, q):
    ans = a * a
    q.put(ans)
    return

et alors vous procéderiez comme tel

from Queue import Queue
from threading import Thread

ans_q = Queue()
arg_tups = [(i, ans_q) for i in xrange(10)]

threads = [Thread(target=func, args=arg_tup) for arg_tup in arg_tups]
_ = [t.start() for t in threads]
_ = [t.join() for t in threads]
results = [q.get() for _ in xrange(len(threads))]

et vous pouvez utiliser la fonction décorateurs / enveloppeurs pour le faire de sorte que vous pouvez utiliser vos fonctions existantes comme target sans les modifier, mais suivre ce schéma de base.

0
répondu tscizzle 2015-12-18 16:05:53

Une solution habituelle est d'envelopper votre fonction foo avec un décorateur, comme

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

alors le code entier peut ressembler à cela

result = queue.Queue()

def task_wrapper(*args):
    result.put(target(*args))

threads = [threading.Thread(target=task_wrapper, args=args) for args in args_list]

for t in threads:
    t.start()
    while(True):
        if(len(threading.enumerate()) < max_num):
            break
for t in threads:
    t.join()
return result

Note

une question importante est que les valeurs de retour peuvent être unorderred . (En fait, le return value n'est pas nécessairement sauvegardé dans le queue , puisque vous pouvez choisir arbitrairement thread-safe données structure)

0
répondu Response777 2017-02-10 06:55:19

pourquoi ne pas simplement utiliser la variable globale?

import threading


class myThread(threading.Thread):
    def __init__(self, ind, lock):
        threading.Thread.__init__(self)
        self.ind = ind
        self.lock = lock

    def run(self):
        global results
        with self.lock:
            results.append(self.ind)



results = []
lock = threading.Lock()
threads = [myThread(x, lock) for x in range(1, 4)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(results)
0
répondu Alexey 2017-10-14 18:54:06

façon très simple de faire cela pour des idiots comme moi:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self):
        # in this class and function we will put our test target function
        test()

t = AnyThread()

# having our test target function
def test():
    # do something in this function:
    result = 3 + 2
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run()
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 5
>>> 5

ici, c'est le module queue . Nous créons l'instance queue.Queue() et l'incluons dans notre fonction. Nous l'alimentons avec notre résultat que nous dépassons plus tard.

s'il vous Plaît voir un exemple de plus avec des arguments transmis à notre test de la fonction:

import queue
import threading

# creating queue instance
q = queue.Queue()

# creating threading class
class AnyThread():
    def __init__ (self):
        threading.Thread.__init__(self)

    def run(self, a, b):
        # in this class and function we will put our execution test function
        test(a, b)

t = AnyThread()

# having our test target function
def test(a, b):
    # do something in this function:
    result = a + b
    # and put result to a queue instance
    q.put(result)

for i in range(3): #calling our threading fucntion 3 times (just for example)
    t.run(3+i, 2+i)
    output = q.get() # here we get output from queue instance
    print(output)

>>> 5
>>> 7
>>> 9
0
répondu Vyachez 2017-11-14 21:56:16

L'idée de GuySoft est grande, mais je pense que l'objet ne doit pas nécessairement hériter du Thread et start () pourrait être retiré de l'interface:

from threading import Thread
import queue
class ThreadWithReturnValue(object):
    def __init__(self, target=None, args=(), **kwargs):
        self._que = queue.Queue()
        self._t = Thread(target=lambda q,arg1,kwargs1: q.put(target(*arg1, **kwargs1)) ,
                args=(self._que, args, kwargs), )
        self._t.start()

    def join(self):
        self._t.join()
        return self._que.get()


def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

print(twrv.join())   # prints foo
0
répondu pandy.song 2018-04-17 13:48:59

Kindall la réponse de en Python3

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, *, daemon=None):
        Thread.__init__(self, group, target, name, args, kwargs, daemon)
        self._return = None 

    def run(self):
        try:
            if self._target:
                self._return = self._target(*self._args, **self._kwargs)
        finally:
            del self._target, self._args, self._kwargs 

    def join(self,timeout=None):
        Thread.join(self,timeout)
        return self._return
0
répondu SmartManoj 2018-05-26 14:55:01