Suppression de toutes les tâches en attente dans celery / rabbitmq
Comment puis-je supprimer toutes les tâches en attente sans connaître le task_id
pour chaque tâche?
7 réponses
$ celery -A proj purge
Ou
from proj.celery import app
app.control.purge()
(EDIT: mis à jour avec la méthode actuelle.)
Pour céleri 3.0+:
$ celery purge
Pour purger une file d'attente spécifique:
$ celery -Q queue_name purge
Pour Le Céleri 2.x et 3.x:
Lors de l'utilisation de worker avec le paramètre-Q pour définir des files d'attente, par exemple
celery worker -Q queue1,queue2,queue3
Alors celery purge
ne fonctionnera pas, car vous ne pouvez pas lui transmettre les paramètres de file d'attente. Il ne supprimera que la file d'attente par défaut.
La solution consiste à démarrer vos travailleurs avec le paramètre --purge
comme ceci:
celery worker -Q queue1,queue2,queue3 --purge
Cela exécutera cependant le travailleur.
Une autre option consiste à utiliser la sous-commande AMQP de celery
celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
Dans Le Céleri 3+:
CLI:
$ celery -A proj purge
Par programme:
>>> from proj.celery import app
>>> app.control.purge()
Http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks
J'ai trouvé que celery purge
ne fonctionne pas pour ma configuration de céleri plus complexe. J'utilise plusieurs files d'attente nommées à des fins différentes:
$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ... # Output sorted, whitespaced for readability
celery 0 2
celery@web01.celery.pidbox 0 1
celery@web02.celery.pidbox 0 1
apns 0 1
apns@web01.celery.pidbox 0 1
analytics 1 1
analytics@web01.celery.pidbox 0 1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0 0 1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1 0 1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54 0 1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866 0 1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99 0 1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e 0 1
La première colonne est le nom de file d'attente, le deuxième est le nombre de messages en attente dans la file d'attente, et le troisième est le nombre d'auditeurs de cette file d'attente. Les files d'attente sont:
- celery-file d'attente pour les tâches de céleri standard, idempotentes
- APNS-file d'attente pour les tâches de service de Notification Push Apple, pas tout à fait comme idempotent
- analytics - File d'attente pour les analyses nocturnes de longue durée
- *.Pidbox-file d'attente pour les commandes de travail, telles que shutdown et reset, une par travailleur (2 travailleurs de céleri, un travailleur apns, un travailleur d'analyse)
- jetés.* - Files d'attente de diffusion, pour envoyer des messages à tous les travailleurs écoutant une file d'attente (plutôt que juste le premier à l'attraper)
- celeryev.* - Files d'attente d'événements céleri, pour l'analyse des tâches de reporting
La tâche d'analyse est une tâche de force brute qui a très bien fonctionné sur de petits ensembles de données, mais prend maintenant plus de 24 heures pour traiter. De temps en temps, quelque chose va mal tourner et il sera bloqué en attente sur la base de données. Il doit être réécrit, mais jusque-là, quand il est bloqué, je tue la tâche, vide la file d'attente et réessaie. Je détecte le "stuckness" en regardant le nombre de messages pour la file d'attente analytics, qui devrait être 0 (analyse terminée) ou 1 (en attente de la fin de l'analyse de la nuit dernière). 2 ou plus mauvais, et je reçois un mail.
celery purge
offre pour effacer les tâches de une des files d'attente de diffusion, et je ne vois pas une option pour choisir une file d'attente nommée différente.
Voici mon processus:
$ sudo /etc/init.d/celeryd stop # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
Dans Le Céleri 3 +
Http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks
CLI
Purger la file d'attente nommée:
celery -A proj amqp queue.purge <queue name>
Purger la file d'attente configurée
celery -A proj purge
J'ai purgé les messages, mais il reste encore des messages dans la file d'attente? Réponse: les tâches sont reconnues (supprimées de la file d'attente) dès qu'elles sont réellement exécutées. Une fois que le travailleur a reçu une tâche, il faudra un certain temps jusqu'à ce qu'elle soit réellement exécuté, surtout s'il y a beaucoup de tâches en attente d'exécution. Les Messages qui ne sont pas reconnus sont conservés par le worker jusqu'à ce qu'il ferme la connexion au broker (serveur AMQP). Lorsque cette connexion est fermée (par exemple parce que le travailleur a été arrêté), les tâches seront renvoyées par le courtier au prochain travailleur disponible (ou au même travailleur lorsqu'il a été redémarré), donc pour purger correctement la file d'attente des tâches, vous devez arrêter tous les travailleurs, puis purger les tâches à l'aide de céleri.contrôle.purge().
Donc, pour purger toute la file d'attente, les travailleurs doivent être arrêtés.
1. Pour purger correctement la file d'attente des tâches vous devez arrêter tous les travailleurs (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):
$ sudo rabbitmqctl stop
Ou (dans le cas où RabbitMQ/Message broker est géré par le superviseur):
$ sudo supervisorctl stop all
2. ...et puis purgez les tâches d'une file d'attente spécifique:
$ cd <source_dir>
$ celery amqp queue.purge <queue name>
3. Démarrer RabbitMQ:
$ sudo rabbitmqctl start
Ou (dans le cas où RabbitMQ est géré par le superviseur):
$ sudo supervisorctl start all