Quelle est la meilleure façon d'exécuter une fonction Toutes les x secondes en Python?

je veux exécuter plusieurs fois une fonction en Python toutes les 60 secondes pour toujours (tout comme un NSTimer dans L'objectif C). Ce code s'exécute comme un démon et est effectivement comme appeler le script python à chaque minute en utilisant un cron, mais sans exiger que ce soit mis en place par l'utilisateur.

Dans cette question sur un cron implémenté en Python , la solution semble effectivement juste sleep() pour les x secondes. Je n'ai pas besoin d'une telle fonctionnalité avancée donc peut-être quelque chose comme ça pourrait fonctionner

while True:
    # Code executed here
    time.sleep(60)

y a-t-il des problèmes prévisibles avec ce code?

173
demandé sur Community 2009-01-24 00:07:05

15 réponses

utilisez le module sched , qui implémente un planificateur d'événements à usage général.

import sched, time
s = sched.scheduler(time.time, time.sleep)
def do_something(sc): 
    print "Doing stuff..."
    # do your stuff
    s.enter(60, 1, do_something, (sc,))

s.enter(60, 1, do_something, (s,))
s.run()
160
répondu nosklo 2016-08-31 15:31:13

il suffit de verrouiller votre boucle temporelle sur l'horloge du système. Facile.

import time
starttime=time.time()
while True:
  print "tick"
  time.sleep(60.0 - ((time.time() - starttime) % 60.0))
102
répondu Dave Rove 2014-08-11 20:25:25

vous pourriez vouloir considérer Twisted qui est une bibliothèque de réseau python qui implémente le Pattern de réacteur .

from twisted.internet import task
from twisted.internet import reactor

timeout = 60.0 # Sixty seconds

def doWork():
    #do work here
    pass

l = task.LoopingCall(doWork)
l.start(timeout) # call every sixty seconds

reactor.run()

alors que "while True: sleep (60)" va probablement fonctionner Twisted met probablement déjà en œuvre de nombreuses fonctionnalités dont vous aurez éventuellement besoin (démonisation, journalisation ou manipulation d'exception comme souligné par bobince) et sera probablement une solution plus robuste""

53
répondu Aaron Maenpaa 2009-01-23 21:28:49

si vous voulez une façon non-bloquante d'exécuter votre fonction périodiquement, au lieu d'une boucle infinie bloquante, j'utiliserais une minuterie filetée. De cette façon, votre code peut continuer à fonctionner et à effectuer d'autres tâches et encore avoir votre fonction appelée Toutes les n secondes. J'utilise beaucoup cette technique pour imprimer des informations de progrès sur des tâches longues, intensives en CPU/disque/réseau.

voici le code que j'ai posté dans une question similaire, avec contrôle start() et stop ():

from threading import Timer

class RepeatedTimer(object):
    def __init__(self, interval, function, *args, **kwargs):
        self._timer     = None
        self.interval   = interval
        self.function   = function
        self.args       = args
        self.kwargs     = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

Utilisation:

from time import sleep

def hello(name):
    print "Hello %s!" % name

print "starting..."
rt = RepeatedTimer(1, hello, "World") # it auto-starts, no need of rt.start()
try:
    sleep(5) # your long-running job goes here...
finally:
    rt.stop() # better in a try/finally block to make sure the program ends!

Dispose:

  • bibliothèque Standard SEULEMENT, PAS de dépendances externes
  • start() et stop() sont sûrs de faire appel à plusieurs fois, même si le compteur a déjà démarré/arrêté
  • fonction à appeler peut avoir des arguments de position et nommés
  • vous pouvez changer interval n'importe quand, il sera effectif après la prochaine course. Même pour args , kwargs et même function !
36
répondu MestreLion 2016-07-11 22:15:34

la façon La plus facile, je crois:

import time

def executeSomething():
    #code here
    time.sleep(60)

while True:
    executeSomething()

de cette façon votre code est exécuté, puis il attend 60 secondes puis il exécute à nouveau, attend, exécute, etc... Pas besoin de compliquer les choses: d

33
répondu Itxaka 2013-09-13 02:54:45

voici une mise à jour du code de Mestrlion qui évite de dériver avec le temps:

import threading 
import time

class RepeatedTimer(object):
  def __init__(self, interval, function, *args, **kwargs):
    self._timer = None
    self.interval = interval
    self.function = function
    self.args = args
    self.kwargs = kwargs
    self.is_running = False
    self.next_call = time.time()
    self.start()

  def _run(self):
    self.is_running = False
    self.start()
    self.function(*self.args, **self.kwargs)

  def start(self):
    if not self.is_running:
      self.next_call += self.interval
      self._timer = threading.Timer(self.next_call - time.time(), self._run)
      self._timer.start()
      self.is_running = True

  def stop(self):
    self._timer.cancel()
    self.is_running = False
12
répondu eraoul 2017-12-06 06:13:31
import time, traceback

def every(delay, task):
  next_time = time.time() + delay
  while True:
    time.sleep(max(0, next_time - time.time()))
    try:
      task()
    except Exception:
      traceback.print_exc()
      # in production code you might want to have this instead of course:
      # logger.exception("Problem while executing repetitive task.")
    # skip tasks if we are behind schedule:
    next_time += (time.time() - next_time) // delay * delay + delay

def foo():
  print("foo", time.time())

every(5, foo)

si vous voulez le faire sans bloquer votre code restant, vous pouvez l'utiliser pour le laisser fonctionner dans son propre thread:

import threading
threading.Thread(target=lambda: every(5, foo)).start()

cette solution combine plusieurs caractéristiques rarement trouvées combinées dans les autres solutions:

  • traitement des exceptions: dans la mesure du possible, à ce niveau, les exceptions sont gérées correctement, I. E. se connecter à des fins de débogage sans interrompre notre programme.
  • Pas de chaîne: La commune comme une chaîne de mise en œuvre (pour la planification de l'événement suivant) que vous trouverez dans beaucoup de réponses est fragile dans l'aspect que si quelque chose va mal dans le mécanisme de planification ( threading.Timer ou quoi que ce soit), cela permettra de mettre fin à la chaîne. Aucune autre exécution n'aura lieu alors, même si la raison du problème est déjà résolue. Une simple boucle et l'attente avec un simple sleep() est beaucoup plus robuste en comparaison.
  • Pas de dérive: Ma solution fait une exacte de la piste de l'époque, il est censé fonctionner. Il n'y a pas de dérive en fonction du temps d'exécution (comme dans beaucoup d'autres solutions).
  • Sauter: Ma solution va sauter tâches si une exécution a eu trop de temps (à l'e. g. faire X toutes les cinq secondes, mais X a 6 secondes). C'est le comportement cron standard (et pour une bonne raison). Beaucoup d'autres solutions alors simplement exécuter la tâche plusieurs fois de suite sans aucun retard. Dans la plupart des cas (E. G. tâches de nettoyage) ceci n'est pas souhaité. Si elle est souhaité, il suffit d'utiliser next_time += delay à la place.
6
répondu Alfe 2018-10-01 08:35:06

j'ai fait face à un problème similaire il y a quelque temps. Peut être http://cronus.readthedocs.org pourrait aider?

pour v0.2, l'extrait suivant travaille

import cronus.beat as beat

beat.set_rate(2) # 2 Hz
while beat.true():
    # do some time consuming work here
    beat.sleep() # total loop duration would be 0.5 sec
5
répondu Anay 2014-06-10 12:16:51

la principale différence entre ça et cron est qu'une exception tuera le démon pour de bon. Vous pourriez vouloir envelopper avec un receveur d'exception et logger.

4
répondu bobince 2009-01-23 21:12:17

j'utilise ceci pour causer 60 événements par heure avec la plupart des événements se produisant au même nombre de secondes après la minute entière:

import math
import time
import random

TICK = 60 # one minute tick size
TICK_TIMING = 59 # execute on 59th second of the tick
TICK_MINIMUM = 30 # minimum catch up tick size when lagging

def set_timing():

    now = time.time()
    elapsed = now - info['begin']
    minutes = math.floor(elapsed/TICK)
    tick_elapsed = now - info['completion_time']
    if (info['tick']+1) > minutes:
        wait = max(0,(TICK_TIMING-(time.time() % TICK)))
        print ('standard wait: %.2f' % wait)
        time.sleep(wait)
    elif tick_elapsed < TICK_MINIMUM:
        wait = TICK_MINIMUM-tick_elapsed
        print ('minimum wait: %.2f' % wait)
        time.sleep(wait)
    else:
        print ('skip set_timing(); no wait')
    drift = ((time.time() - info['begin']) - info['tick']*TICK -
        TICK_TIMING + info['begin']%TICK)
    print ('drift: %.6f' % drift)

info['tick'] = 0
info['begin'] = time.time()
info['completion_time'] = info['begin'] - TICK

while 1:

    set_timing()

    print('hello world')

    #random real world event
    time.sleep(random.random()*TICK_MINIMUM)

    info['tick'] += 1
    info['completion_time'] = time.time()

selon les conditions réelles, vous pourriez obtenir des tiques de longueur:

60,60,62,58,60,60,120,30,30,60,60,60,60,60...etc.

mais au bout de 60 minutes vous aurez 60 tics; et la plupart d'entre eux se produiront à l'offset correct à la minute que vous préférez.

sur mon système j'obtiens une dérive typique de < 1 / 20ème de seconde jusqu'à ce que nécessité pour la correction se pose.

l'avantage de cette méthode est la résolution de la dérive de l'horloge; qui peut causer des problèmes si vous faites des choses comme Ajouter un article par tique et vous vous attendez à 60 articles ajoutés par heure. Le fait de ne pas tenir compte de la dérive peut entraîner des indications secondaires, comme des moyennes mobiles, à considérer les données trop profondément dans le passé, ce qui entraîne une sortie défectueuse.

0
répondu litepresence 2017-02-21 17:33:12

Une réponse possible:

import time
t=time.time()

while True:
    if time.time()-t>10:
        #run your task here
        t=time.time()
0
répondu sks 2017-03-24 06:56:37

p.ex., afficher l'heure locale courante

import datetime
import glib
import logger

def get_local_time():
    current_time = datetime.datetime.now().strftime("%H:%M")
    logger.info("get_local_time(): %s",current_time)
    return str(current_time)

def display_local_time():
    logger.info("Current time is: %s", get_local_time())
    return True

# call every minute
glib.timeout_add(60*1000, display_local_time)
0
répondu rise.riyo 2018-02-08 19:55:21

j'utilise la méthode Tkinter after (), qui ne "vole pas le jeu" (comme le module sched qui a été présenté plus tôt), c'est-à-dire qu'elle permet à d'autres choses de fonctionner en parallèle:

import Tkinter

def do_something1():
  global n1
  n1 += 1
  if n1 == 6: # (Optional condition)
    print "* do_something1() is done *"; return
  # Do your stuff here
  # ...
  print "do_something1() "+str(n1)
  tk.after(1000, do_something1)

def do_something2(): 
  global n2
  n2 += 1
  if n2 == 6: # (Optional condition)
    print "* do_something2() is done *"; return
  # Do your stuff here
  # ...
  print "do_something2() "+str(n2)
  tk.after(500, do_something2)

tk = Tkinter.Tk(); 
n1 = 0; n2 = 0
do_something1()
do_something2()
tk.mainloop()

do_something1() et do_something2() peuvent fonctionner en parallèle et à n'importe quelle vitesse d'intervalle. Ici, le 2e sera exécuté deux fois plus vite.Notez aussi que j'ai utilisé un compteur simple comme condition pour terminer l'une ou l'autre de ces fonctions. Vous pouvez utiliser n'importe quel autre contition vous aimez ou Aucun si vous ce que une fonction à exécuter jusqu'à ce que le programme se termine (par exemple une horloge).

0
répondu Apostolos 2018-03-14 08:27:20
    ''' tracking number of times it prints'''
import threading

global timeInterval
count=0
def printit():
  threading.Timer(timeInterval, printit).start()
  print( "Hello, World!")
  global count
  count=count+1
  print(count)
printit

if __name__ == "__main__":
    timeInterval= int(input('Enter Time in Seconds:'))
    printit()
0
répondu raviGupta 2018-08-28 11:08:05

Voici une version adaptée du code de Mestrlion. En plus de la fonction originale, ce code:

1) Ajouter first_interval utilisé pour allumer le minuteur à un moment précis(l'appelant doit calculer first_interval et passer à)

2) résoudre une condition de race dans le code original. Dans le code d'origine, si le contrôle thread a échoué pour annuler le minuteur("l'Arrêt de la minuterie, et d'annuler l'exécution de la minuterie de l'action. Cela ne fonctionnera si le minuteur est toujours en attente."Cité de https://docs.python.org/2/library/threading.html ), la minuterie fonctionnera sans fin.

class RepeatedTimer(object):
def __init__(self, first_interval, interval, func, *args, **kwargs):
    self.timer      = None
    self.first_interval = first_interval
    self.interval   = interval
    self.func   = func
    self.args       = args
    self.kwargs     = kwargs
    self.running = False
    self.is_started = False

def first_start(self):
    try:
        # no race-condition here because only control thread will call this method
        # if already started will not start again
        if not self.is_started:
            self.is_started = True
            self.timer = Timer(self.first_interval, self.run)
            self.running = True
            self.timer.start()
    except Exception as e:
        log_print(syslog.LOG_ERR, "timer first_start failed %s %s"%(e.message, traceback.format_exc()))
        raise

def run(self):
    # if not stopped start again
    if self.running:
        self.timer = Timer(self.interval, self.run)
        self.timer.start()
    self.func(*self.args, **self.kwargs)

def stop(self):
    # cancel current timer in case failed it's still OK
    # if already stopped doesn't matter to stop again
    if self.timer:
        self.timer.cancel()
    self.running = False
0
répondu dproc 2018-09-10 09:55:43