Suppression de toutes les tâches en attente dans celery / rabbitmq

Comment puis-je supprimer toutes les tâches en attente sans connaître le task_id pour chaque tâche?

144
demandé sur nabizan 2011-08-22 18:35:22

7 réponses

À partir des documents :

$ celery -A proj purge

Ou

from proj.celery import app
app.control.purge()

(EDIT: mis à jour avec la méthode actuelle.)

208
répondu Philip Southam 2017-01-27 15:20:36

Pour céleri 3.0+:

$ celery purge

Pour purger une file d'attente spécifique:

$ celery -Q queue_name purge
107
répondu ToonAlfrink 2017-11-01 15:52:49

Pour Le Céleri 2.x et 3.x:

Lors de l'utilisation de worker avec le paramètre-Q pour définir des files d'attente, par exemple

celery worker -Q queue1,queue2,queue3

Alors celery purge ne fonctionnera pas, car vous ne pouvez pas lui transmettre les paramètres de file d'attente. Il ne supprimera que la file d'attente par défaut. La solution consiste à démarrer vos travailleurs avec le paramètre --purge comme ceci:

celery worker -Q queue1,queue2,queue3 --purge

Cela exécutera cependant le travailleur.

Une autre option consiste à utiliser la sous-commande AMQP de celery

celery amqp queue.delete queue1
celery amqp queue.delete queue2
celery amqp queue.delete queue3
20
répondu smido 2017-07-11 17:47:06

Dans Le Céleri 3+:

CLI:

$ celery -A proj purge

Par programme:

>>> from proj.celery import app
>>> app.control.purge()

Http://docs.celeryproject.org/en/latest/faq.html#how-do-i-purge-all-waiting-tasks

9
répondu ksindi 2016-04-03 17:28:35

J'ai trouvé que celery purge ne fonctionne pas pour ma configuration de céleri plus complexe. J'utilise plusieurs files d'attente nommées à des fins différentes:

$ sudo rabbitmqctl list_queues -p celery name messages consumers
Listing queues ...  # Output sorted, whitespaced for readability
celery                                          0   2
celery@web01.celery.pidbox                      0   1
celery@web02.celery.pidbox                      0   1
apns                                            0   1
apns@web01.celery.pidbox                        0   1
analytics                                       1   1
analytics@web01.celery.pidbox                   0   1
bcast.361093f1-de68-46c5-adff-d49ea8f164c0      0   1
bcast.a53632b0-c8b8-46d9-bd59-364afe9998c1      0   1
celeryev.c27b070d-b07e-4e37-9dca-dbb45d03fd54   0   1
celeryev.c66a9bed-84bd-40b0-8fe7-4e4d0c002866   0   1
celeryev.b490f71a-be1a-4cd8-ae17-06a713cc2a99   0   1
celeryev.9d023165-ab4a-42cb-86f8-90294b80bd1e   0   1

La première colonne est le nom de file d'attente, le deuxième est le nombre de messages en attente dans la file d'attente, et le troisième est le nombre d'auditeurs de cette file d'attente. Les files d'attente sont:

  • celery-file d'attente pour les tâches de céleri standard, idempotentes
  • APNS-file d'attente pour les tâches de service de Notification Push Apple, pas tout à fait comme idempotent
  • analytics - File d'attente pour les analyses nocturnes de longue durée
  • *.Pidbox-file d'attente pour les commandes de travail, telles que shutdown et reset, une par travailleur (2 travailleurs de céleri, un travailleur apns, un travailleur d'analyse)
  • jetés.* - Files d'attente de diffusion, pour envoyer des messages à tous les travailleurs écoutant une file d'attente (plutôt que juste le premier à l'attraper)
  • celeryev.* - Files d'attente d'événements céleri, pour l'analyse des tâches de reporting

La tâche d'analyse est une tâche de force brute qui a très bien fonctionné sur de petits ensembles de données, mais prend maintenant plus de 24 heures pour traiter. De temps en temps, quelque chose va mal tourner et il sera bloqué en attente sur la base de données. Il doit être réécrit, mais jusque-là, quand il est bloqué, je tue la tâche, vide la file d'attente et réessaie. Je détecte le "stuckness" en regardant le nombre de messages pour la file d'attente analytics, qui devrait être 0 (analyse terminée) ou 1 (en attente de la fin de l'analyse de la nuit dernière). 2 ou plus mauvais, et je reçois un mail.

celery purge offre pour effacer les tâches de une des files d'attente de diffusion, et je ne vois pas une option pour choisir une file d'attente nommée différente.

Voici mon processus:

$ sudo /etc/init.d/celeryd stop  # Wait for analytics task to be last one, Ctrl-C
$ ps -ef | grep analytics  # Get the PID of the worker, not the root PID reported by celery
$ sudo kill <PID>
$ sudo /etc/init.d/celeryd stop  # Confim dead
$ python manage.py celery amqp queue.purge analytics
$ sudo rabbitmqctl list_queues -p celery name messages consumers  # Confirm messages is 0
$ sudo /etc/init.d/celeryd start
8
répondu jwhitlock 2014-10-03 20:00:19

Dans Le Céleri 3 +

Http://docs.celeryproject.org/en/3.1/faq.html#how-do-i-purge-all-waiting-tasks

CLI

Purger la file d'attente nommée:

 celery -A proj amqp queue.purge <queue name>

Purger la file d'attente configurée

celery -A proj purge

J'ai purgé les messages, mais il reste encore des messages dans la file d'attente? Réponse: les tâches sont reconnues (supprimées de la file d'attente) dès qu'elles sont réellement exécutées. Une fois que le travailleur a reçu une tâche, il faudra un certain temps jusqu'à ce qu'elle soit réellement exécuté, surtout s'il y a beaucoup de tâches en attente d'exécution. Les Messages qui ne sont pas reconnus sont conservés par le worker jusqu'à ce qu'il ferme la connexion au broker (serveur AMQP). Lorsque cette connexion est fermée (par exemple parce que le travailleur a été arrêté), les tâches seront renvoyées par le courtier au prochain travailleur disponible (ou au même travailleur lorsqu'il a été redémarré), donc pour purger correctement la file d'attente des tâches, vous devez arrêter tous les travailleurs, puis purger les tâches à l'aide de céleri.contrôle.purge().

Donc, pour purger toute la file d'attente, les travailleurs doivent être arrêtés.

5
répondu oneklc 2017-01-20 00:12:30

1. Pour purger correctement la file d'attente des tâches vous devez arrêter tous les travailleurs (http://celery.readthedocs.io/en/latest/faq.html#i-ve-purged-messages-but-there-are-still-messages-left-in-the-queue):

$ sudo rabbitmqctl stop

Ou (dans le cas où RabbitMQ/Message broker est géré par le superviseur):

$ sudo supervisorctl stop all

2. ...et puis purgez les tâches d'une file d'attente spécifique:

$ cd <source_dir>
$ celery amqp queue.purge <queue name>

3. Démarrer RabbitMQ:

$ sudo rabbitmqctl start

Ou (dans le cas où RabbitMQ est géré par le superviseur):

$ sudo supervisorctl start all
1
répondu Ukr 2017-03-27 15:48:07