Comment ajouter un timeout à une fonction en Python
de nombreuses tentatives ont été faites dans le passé pour ajouter une fonctionnalité de timeout en Python telle que lorsqu'une limite de temps spécifiée expirait, le code d'attente pouvait passer à autre chose. Malheureusement, les recettes précédentes ont soit permis à la fonction d'exécution de continuer à exécuter et à consommer des ressources, soit tué la fonction en utilisant une méthode de terminaison de thread spécifique à la plate-forme. Le but de ce wiki est de développer une réponse multiplateforme à ce problème que de nombreux programmeurs ont dû aborder pour divers projets de programmation.
#! /usr/bin/env python
"""Provide way to add timeout specifications to arbitrary functions.
There are many ways to add a timeout to a function, but no solution
is both cross-platform and capable of terminating the procedure. This
module use the multiprocessing module to solve both of those problems."""
################################################################################
__author__ = 'Stephen "Zero" Chappell <Noctis.Skytower@gmail.com>'
__date__ = '11 February 2010'
__version__ = '$Revision: 3 $'
################################################################################
import inspect
import sys
import time
import multiprocessing
################################################################################
def add_timeout(function, limit=60):
"""Add a timeout parameter to a function and return it.
It is illegal to pass anything other than a function as the first
parameter. If the limit is not given, it gets a default value equal
to one minute. The function is wrapped and returned to the caller."""
assert inspect.isfunction(function)
if limit <= 0:
raise ValueError()
return _Timeout(function, limit)
class NotReadyError(Exception): pass
################################################################################
def _target(queue, function, *args, **kwargs):
"""Run a function with arguments and return output via a queue.
This is a helper function for the Process created in _Timeout. It runs
the function with positional arguments and keyword arguments and then
returns the function's output by way of a queue. If an exception gets
raised, it is returned to _Timeout to be raised by the value property."""
try:
queue.put((True, function(*args, **kwargs)))
except:
queue.put((False, sys.exc_info()[1]))
class _Timeout:
"""Wrap a function and add a timeout (limit) attribute to it.
Instances of this class are automatically generated by the add_timeout
function defined above. Wrapping a function allows asynchronous calls
to be made and termination of execution after a timeout has passed."""
def __init__(self, function, limit):
"""Initialize instance in preparation for being called."""
self.__limit = limit
self.__function = function
self.__timeout = time.clock()
self.__process = multiprocessing.Process()
self.__queue = multiprocessing.Queue()
def __call__(self, *args, **kwargs):
"""Execute the embedded function object asynchronously.
The function given to the constructor is transparently called and
requires that "ready" be intermittently polled. If and when it is
True, the "value" property may then be checked for returned data."""
self.cancel()
self.__queue = multiprocessing.Queue(1)
args = (self.__queue, self.__function) + args
self.__process = multiprocessing.Process(target=_target,
args=args,
kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
self.__timeout = self.__limit + time.clock()
def cancel(self):
"""Terminate any possible execution of the embedded function."""
if self.__process.is_alive():
self.__process.terminate()
@property
def ready(self):
"""Read-only property indicating status of "value" property."""
if self.__queue.full():
return True
elif not self.__queue.empty():
return True
elif self.__timeout < time.clock():
self.cancel()
else:
return False
@property
def value(self):
"""Read-only property containing data returned from function."""
if self.ready is True:
flag, load = self.__queue.get()
if flag:
return load
raise load
raise NotReadyError()
def __get_limit(self):
return self.__limit
def __set_limit(self, value):
if value <= 0:
raise ValueError()
self.__limit = value
limit = property(__get_limit, __set_limit,
doc="Property for controlling the value of the timeout.")
Edit: ce code a été écrit pour Python 3.x et n'a pas été conçu pour les méthodes de classe comme une décoration. multiprocessing
module n'a pas été conçu pour modifier les instances de classe au-delà des limites du processus.
3 réponses
le principal problème avec votre code est l'utilisation excessive de la double underscore namespace conflict prevention dans une classe qui n'est pas prévue du tout.
En général, self.__foo
est une odeur de code qui doit être accompagnée d'un commentaire du genre # This is a mixin and we don't want arbitrary subclasses to have a namespace conflict
.
en outre, l'API client de cette méthode ressemblerait à ceci:
def mymethod(): pass
mymethod = add_timeout(mymethod, 15)
# start the processing
timeout_obj = mymethod()
try:
# access the property, which is really a function call
ret = timeout_obj.value
except TimeoutError:
# handle a timeout here
ret = None
Ce n'est pas très pythonic à tous et un meilleur client api serait:
@timeout(15)
def mymethod(): pass
try:
my_method()
except TimeoutError:
pass
vous utilisez @property dans votre classe pour quelque chose qui est un accessor Mutant d'état, ce n'est pas une bonne idée. Par exemple, qu'arriverait-il quand .la valeur est accessible deux fois? On dirait que ça échouerait parce que la file d'attente.get () retournerait trash car la file d'attente est déjà vide.
Supprimer @propriété entièrement. Ne pas l'utiliser dans ce contexte, il n'est pas adapté à votre cas d'utilisation. Faire appel bloquer lors de l'appel et retourner la valeur ou augmenter l'exception elle-même. Si vous avez vraiment doit avoir la valeur consultée plus tard, en faire une méthode comme .get() ou .valeur.)(
Ce code pour le _target doit être réécrit un peu:
def _target(queue, function, *args, **kwargs):
try:
queue.put((True, function(*args, **kwargs)))
except:
queue.put((False, exc_info())) # get *all* the exec info, don't do exc_info[1]
# then later:
raise exc_info[0], exc_info[1], exc_info[2]
de cette façon la trace de la pile sera conservée correctement et visible par le programmeur.
je pense que vous avez fait une première tentative raisonnable à écrire une bibliothèque utile, j'aime l'utilisation du module de traitement pour atteindre les objectifs.
Voici comment obtenir la syntaxe de décorateur mentionnée par Jerub
def timeout(limit=None):
if limit is None:
limit = DEFAULT_TIMEOUT
if limit <= 0:
raise TimeoutError() # why not ValueError here?
def wrap(function):
return _Timeout(function,limit)
return wrap
@timeout(15)
def mymethod(): pass
Caillou la bibliothèque a été conçue pour offrir une implémentation multi-plateforme capable de traiter une logique problématique qui pourrait crash, d'erreur ou d'exécuter indéfiniment.
from pebble import concurrent
@concurrent.process(timeout=10)
def function(foo, bar=0):
return foo + bar
future = function(1, bar=2)
try:
result = future.result() # blocks until results are ready
except Exception as error:
print("Function raised %s" % error)
print(error.traceback) # traceback of the function
except TimeoutError as error:
print("Function took longer than %d seconds" % error.args[1])
le décorateur fonctionne aussi bien avec les méthodes statiques et de classe. Je ne recommande pas de décorer les méthodes néanmoins, car il est une pratique tout à fait sujette à erreur.