Quelle est la meilleure façon d'exécuter une fonction Toutes les x secondes en Python?
je veux exécuter plusieurs fois une fonction en Python toutes les 60 secondes pour toujours (tout comme un NSTimer dans L'objectif C). Ce code s'exécute comme un démon et est effectivement comme appeler le script python à chaque minute en utilisant un cron, mais sans exiger que ce soit mis en place par l'utilisateur.
Dans cette question sur un cron implémenté en Python , la solution semble effectivement juste sleep() pour les x secondes. Je n'ai pas besoin d'une telle fonctionnalité avancée donc peut-être quelque chose comme ça pourrait fonctionner
while True:
# Code executed here
time.sleep(60)
y a-t-il des problèmes prévisibles avec ce code?
15 réponses
utilisez le module sched , qui implémente un planificateur d'événements à usage général.
import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc):
print "Doing stuff..."
# do your stuff
s.enter(60, 1, do_something, (sc,))
s.enter(60, 1, do_something, (s,))
s.run()
il suffit de verrouiller votre boucle temporelle sur l'horloge du système. Facile.
import time
starttime=time.time()
while True:
print "tick"
time.sleep(60.0 - ((time.time() - starttime) % 60.0))
vous pourriez vouloir considérer Twisted qui est une bibliothèque de réseau python qui implémente le Pattern de réacteur .
from twisted.internet import task
from twisted.internet import reactor
timeout = 60.0 # Sixty seconds
def doWork():
#do work here
pass
l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds
reactor.run()
alors que "while True: sleep (60)" va probablement fonctionner Twisted met probablement déjà en œuvre de nombreuses fonctionnalités dont vous aurez éventuellement besoin (démonisation, journalisation ou manipulation d'exception comme souligné par bobince) et sera probablement une solution plus robuste""
si vous voulez une façon non-bloquante d'exécuter votre fonction périodiquement, au lieu d'une boucle infinie bloquante, j'utiliserais une minuterie filetée. De cette façon, votre code peut continuer à fonctionner et à effectuer d'autres tâches et encore avoir votre fonction appelée Toutes les n secondes. J'utilise beaucoup cette technique pour imprimer des informations de progrès sur des tâches longues, intensives en CPU/disque/réseau.
voici le code que j'ai posté dans une question similaire, avec contrôle start() et stop ():
from threading import Timer
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self._timer = Timer(self.interval, self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
Utilisation:
from time import sleep
def hello(name):
print "Hello %s!" % name
print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
sleep(5) # your long-running job goes here...
finally:
rt.stop() # better in a try/finally block to make sure the program ends!
Dispose:
- bibliothèque Standard SEULEMENT, PAS de dépendances externes
-
start()
etstop()
sont sûrs de faire appel à plusieurs fois, même si le compteur a déjà démarré/arrêté - fonction à appeler peut avoir des arguments de position et nommés
- vous pouvez changer
interval
n'importe quand, il sera effectif après la prochaine course. Même pourargs
,kwargs
et mêmefunction
!
la façon La plus facile, je crois:
import time
def executeSomething():
#code here
time.sleep(60)
while True:
executeSomething()
de cette façon votre code est exécuté, puis il attend 60 secondes puis il exécute à nouveau, attend, exécute, etc... Pas besoin de compliquer les choses: d
voici une mise à jour du code de Mestrlion qui évite de dériver avec le temps:
import threading
import time
class RepeatedTimer(object):
def __init__(self, interval, function, *args, **kwargs):
self._timer = None
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.is_running = False
self.next_call = time.time()
self.start()
def _run(self):
self.is_running = False
self.start()
self.function(*self.args, **self.kwargs)
def start(self):
if not self.is_running:
self.next_call += self.interval
self._timer = threading.Timer(self.next_call - time.time(), self._run)
self._timer.start()
self.is_running = True
def stop(self):
self._timer.cancel()
self.is_running = False
import time, traceback
def every(delay, task):
next_time = time.time() + delay
while True:
time.sleep(max(0, next_time - time.time()))
try:
task()
except Exception:
traceback.print_exc()
# in production code you might want to have this instead of course:
# logger.exception("Problem while executing repetitive task.")
# skip tasks if we are behind schedule:
next_time += (time.time() - next_time) // delay * delay + delay
def foo():
print("foo", time.time())
every(5, foo)
si vous voulez le faire sans bloquer votre code restant, vous pouvez l'utiliser pour le laisser fonctionner dans son propre thread:
import threading
threading.Thread(target=lambda: every(5, foo)).start()
cette solution combine plusieurs caractéristiques rarement trouvées combinées dans les autres solutions:
- traitement des exceptions: dans la mesure du possible, à ce niveau, les exceptions sont gérées correctement, I. E. se connecter à des fins de débogage sans interrompre notre programme.
- Pas de chaîne: La commune comme une chaîne de mise en œuvre (pour la planification de l'événement suivant) que vous trouverez dans beaucoup de réponses est fragile dans l'aspect que si quelque chose va mal dans le mécanisme de planification (
threading.Timer
ou quoi que ce soit), cela permettra de mettre fin à la chaîne. Aucune autre exécution n'aura lieu alors, même si la raison du problème est déjà résolue. Une simple boucle et l'attente avec un simplesleep()
est beaucoup plus robuste en comparaison. - Pas de dérive: Ma solution fait une exacte de la piste de l'époque, il est censé fonctionner. Il n'y a pas de dérive en fonction du temps d'exécution (comme dans beaucoup d'autres solutions).
- Sauter: Ma solution va sauter tâches si une exécution a eu trop de temps (à l'e. g. faire X toutes les cinq secondes, mais X a 6 secondes). C'est le comportement cron standard (et pour une bonne raison). Beaucoup d'autres solutions alors simplement exécuter la tâche plusieurs fois de suite sans aucun retard. Dans la plupart des cas (E. G. tâches de nettoyage) ceci n'est pas souhaité. Si elle est souhaité, il suffit d'utiliser
next_time += delay
à la place.
j'ai fait face à un problème similaire il y a quelque temps. Peut être http://cronus.readthedocs.org pourrait aider?
pour v0.2, l'extrait suivant travaille
import cronus.beat as beat
beat.set_rate(2) # 2 Hz
while beat.true():
# do some time consuming work here
beat.sleep() # total loop duration would be 0.5 sec
la principale différence entre ça et cron est qu'une exception tuera le démon pour de bon. Vous pourriez vouloir envelopper avec un receveur d'exception et logger.
j'utilise ceci pour causer 60 événements par heure avec la plupart des événements se produisant au même nombre de secondes après la minute entière:
import math
import time
import random
TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging
def set_timing():
now = time.time()
elapsed = now - info['begin']
minutes = math.floor(elapsed/TICK)
tick_elapsed = now - info['completion_time']
if (info['tick']+1) > minutes:
wait = max(0,(TICK_TIMING-(time.time() % TICK)))
print ('standard wait: %.2f' % wait)
time.sleep(wait)
elif tick_elapsed < TICK_MINIMUM:
wait = TICK_MINIMUM-tick_elapsed
print ('minimum wait: %.2f' % wait)
time.sleep(wait)
else:
print ('skip set_timing(); no wait')
drift = ((time.time() - info['begin']) - info['tick']*TICK -
TICK_TIMING + info['begin']%TICK)
print ('drift: %.6f' % drift)
info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK
while 1:
set_timing()
print('hello world')
#random real world event
time.sleep(random.random()*TICK_MINIMUM)
info['tick'] += 1
info['completion_time'] = time.time()
selon les conditions réelles, vous pourriez obtenir des tiques de longueur:
60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.
mais au bout de 60 minutes vous aurez 60 tics; et la plupart d'entre eux se produiront à l'offset correct à la minute que vous préférez.
sur mon système j'obtiens une dérive typique de < 1 / 20ème de seconde jusqu'à ce que nécessité pour la correction se pose.
l'avantage de cette méthode est la résolution de la dérive de l'horloge; qui peut causer des problèmes si vous faites des choses comme Ajouter un article par tique et vous vous attendez à 60 articles ajoutés par heure. Le fait de ne pas tenir compte de la dérive peut entraîner des indications secondaires, comme des moyennes mobiles, à considérer les données trop profondément dans le passé, ce qui entraîne une sortie défectueuse.
Une réponse possible:
import time
t=time.time()
while True:
if time.time()-t>10:
#run your task here
t=time.time()
p.ex., afficher l'heure locale courante
import datetime
import glib
import logger
def get_local_time():
current_time = datetime.datetime.now().strftime("%H:%M")
logger.info("get_local_time(): %s",current_time)
return str(current_time)
def display_local_time():
logger.info("Current time is: %s", get_local_time())
return True
# call every minute
glib.timeout_add(60*1000, display_local_time)
j'utilise la méthode Tkinter after (), qui ne "vole pas le jeu" (comme le module sched qui a été présenté plus tôt), c'est-à-dire qu'elle permet à d'autres choses de fonctionner en parallèle:
import Tkinter
def do_something1():
global n1
n1 += 1
if n1 == 6: # (Optional condition)
print "* do_something1() is done *"; return
# Do your stuff here
# ...
print "do_something1() "+str(n1)
tk.after(1000, do_something1)
def do_something2():
global n2
n2 += 1
if n2 == 6: # (Optional condition)
print "* do_something2() is done *"; return
# Do your stuff here
# ...
print "do_something2() "+str(n2)
tk.after(500, do_something2)
tk = Tkinter.Tk();
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()
do_something1()
et do_something2()
peuvent fonctionner en parallèle et à n'importe quelle vitesse d'intervalle. Ici, le 2e sera exécuté deux fois plus vite.Notez aussi que j'ai utilisé un compteur simple comme condition pour terminer l'une ou l'autre de ces fonctions. Vous pouvez utiliser n'importe quel autre contition vous aimez ou Aucun si vous ce que une fonction à exécuter jusqu'à ce que le programme se termine (par exemple une horloge).
''' tracking number of times it prints'''
import threading
global timeInterval
count=0
def printit():
threading.Timer(timeInterval, printit).start()
print( "Hello, World!")
global count
count=count+1
print(count)
printit
if __name__ == "__main__":
timeInterval= int(input('Enter Time in Seconds:'))
printit()
Voici une version adaptée du code de Mestrlion. En plus de la fonction originale, ce code:
1) Ajouter first_interval utilisé pour allumer le minuteur à un moment précis(l'appelant doit calculer first_interval et passer à)
2) résoudre une condition de race dans le code original. Dans le code d'origine, si le contrôle thread a échoué pour annuler le minuteur("l'Arrêt de la minuterie, et d'annuler l'exécution de la minuterie de l'action. Cela ne fonctionnera si le minuteur est toujours en attente."Cité de https://docs.python.org/2/library/threading.html ), la minuterie fonctionnera sans fin.
class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
self.timer = None
self.first_interval = first_interval
self.interval = interval
self.func = func
self.args = args
self.kwargs = kwargs
self.running = False
self.is_started = False
def first_start(self):
try:
# no race-condition here because only control thread will call this method
# if already started will not start again
if not self.is_started:
self.is_started = True
self.timer = Timer(self.first_interval, self.run)
self.running = True
self.timer.start()
except Exception as e:
log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
raise
def run(self):
# if not stopped start again
if self.running:
self.timer = Timer(self.interval, self.run)
self.timer.start()
self.func(*self.args, **self.kwargs)
def stop(self):
# cancel current timer in case failed it's still OK
# if already stopped doesn't matter to stop again
if self.timer:
self.timer.cancel()
self.running = False