Comment puis-je exécuter périodiquement une fonction avec asyncio?

Je migre de tornado vers asyncio, et je ne trouve pas l'équivalent de asyncio de tornado PeriodicCallback. (A PeriodicCallback prend deux arguments: la fonction à exécuter et le nombre de millisecondes entre les appels.)

  • y a - t-il un tel équivalent dans asyncio?
  • sinon, quel serait le moyen le plus propre de mettre en œuvre cela sans courir le risque d'obtenir un RecursionError après un certain temps?
27
demandé sur 2Cubed 2016-05-29 19:17:38

4 réponses

Pour les versions Python inférieures à 3.5:

import asyncio

@asyncio.coroutine
def periodic():
    while True:
        print('periodic')
        yield from asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass

Pour Python 3.5 et supérieur:

import asyncio

async def periodic():
    while True:
        print('periodic')
        await asyncio.sleep(1)

def stop():
    task.cancel()

loop = asyncio.get_event_loop()
loop.call_later(5, stop)
task = loop.create_task(periodic())

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
21
répondu A. Jesse Jiryu Davis 2018-08-13 11:02:59

Lorsque vous sentez que quelque chose devrait arriver "en arrière-plan" de votre programme asyncio, asyncio.Task pourrait être un bon moyen de le faire. Vous pouvez lire ce post pour voir comment travailler avec des tâches.

Voici une implémentation possible de la classe qui exécute une fonction périodiquement:

import asyncio
from contextlib import suppress


class Periodic:
    def __init__(self, func, time):
        self.func = func
        self.time = time
        self.is_started = False
        self._task = None

    async def start(self):
        if not self.is_started:
            self.is_started = True
            # Start task to call func periodically:
            self._task = asyncio.ensure_future(self._run())

    async def stop(self):
        if self.is_started:
            self.is_started = False
            # Stop task and await it stopped:
            self._task.cancel()
            with suppress(asyncio.CancelledError):
                await self._task

    async def _run(self):
        while True:
            await asyncio.sleep(self.time)
            self.func()

Testons-le:

async def main():
    p = Periodic(lambda: print('test'), 1)
    try:
        print('Start')
        await p.start()
        await asyncio.sleep(3.1)

        print('Stop')
        await p.stop()
        await asyncio.sleep(3.1)

        print('Start')
        await p.start()
        await asyncio.sleep(3.1)
    finally:
        await p.stop()  # we should stop task finally


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Sortie:

Start
test
test
test

Stop

Start
test
test
test

[Finished in 9.5s]

Comme vous le voyez sur start nous commençons juste la tâche qui appelle certaines fonctions et dort un peu de temps en boucle sans fin. Sur stop nous annulons juste cela tâche. Notez que cette tâche doit être arrêtée au moment où le programme est terminé.

Une chose plus importante que votre rappel ne devrait pas prendre beaucoup de temps à exécuter (ou il va geler votre boucle d'événement). Si vous prévoyez d'appeler un func de longue durée, vous auriez peut-être besoin de pour l'exécuter dans executor.

13
répondu Mikhail Gerasimov 2017-05-23 12:02:40

Il N'y a pas de support intégré pour les appels périodiques, Non.

Créez simplement votre propre boucle de planificateur qui dort et exécute toutes les tâches planifiées:

import math, time

async def scheduler():
    while True:
        # sleep until the next whole second
        now = time.time()
        await asyncio.sleep(math.ceil(now) - now)

        # execute any scheduled tasks
        await for task in scheduled_tasks(time.time()):
            await task()

L'itérateur scheduled_tasks() doit produire des tâches prêtes à être exécutées au moment donné. Notez que la production du calendrier et le lancement de toutes les tâches pourraient en théorie prendre plus de 1 seconde; l'idée ici est que le planificateur donne toutes les tâches qui auraient dû commencer depuis la dernière vérification.

8
répondu Martijn Pieters 2016-05-29 16:41:05

Basé sur la réponse de @ A. Jesse Jiryu Davis (avec @Torkel Bjørnson-Langen et @rewrite commentaires) c'est une amélioration qui évite la dérive.

import time
import asyncio

@asyncio.coroutine
def periodic(period):
    def g_tick():
        t = time.time()
        count = 0
        while True:
            count += 1
            yield max(t + count * period - time.time(), 0)
    g = g_tick()

    while True:
        print('periodic', time.time())
        yield from asyncio.sleep(next(g))

loop = asyncio.get_event_loop()
task = loop.create_task(periodic(1))
loop.call_later(5, task.cancel)

try:
    loop.run_until_complete(task)
except asyncio.CancelledError:
    pass
0
répondu Wojciech Migda 2018-01-11 09:53:00