Récupérer la liste des tâches dans une file D'attente en céleri

Comment puis-je récupérer une liste de tâches dans une file d'attente qui doivent encore être traitées?

100
demandé sur bradley.ayers 2011-04-05 01:35:36

9 réponses

EDIT: voir les autres réponses pour obtenir une liste des tâches dans la file d'attente.

Vous devriez regarder ici: Guide Céleri-Inspection Des Travailleurs

essentiellement ceci:

>>> from celery.task.control import inspect

# Inspect all nodes.
>>> i = inspect()

# Show the items that have an ETA or are scheduled for later processing
>>> i.scheduled()

# Show tasks that are currently active.
>>> i.active()

# Show tasks that have been claimed by workers
>>> i.reserved()

selon ce que vous voulez

140
répondu semarj 2017-04-14 21:10:53

si vous utilisez rabbitMQ, utilisez-le dans le terminal:

sudo rabbitmqctl list_queues

il affichera la liste des files d'attente avec le nombre de tâches en attente. par exemple:

Listing queues ...
0b27d8c59fba4974893ec22d478a7093    0
0e0a2da9828a48bc86fe993b210d984f    0
10@torob2.celery.pidbox 0
11926b79e30a4f0a9d95df61b6f402f7    0
15c036ad25884b82839495fb29bd6395    1
celerey_mail_worker@torob2.celery.pidbox    0
celery  166
celeryev.795ec5bb-a919-46a8-80c6-5d91d2fcf2aa   0
celeryev.faa4da32-a225-4f6c-be3b-d8814856d1b6   0

le nombre dans la colonne de droite est le nombre de tâches dans la file d'attente. au-dessus, la file d'attente de céleri compte 166 tâches en attente.

34
répondu Ali 2015-04-09 11:55:20

pour récupérer les tâches du backend, utilisez ce

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
                       password="guest", virtual_host="/", insist=False)
chan = conn.channel()
name, jobs, consumers = chan.queue_declare(queue="queue_name", passive=True)
10
répondu ashish 2017-04-14 21:18:38

si vous n'utilisez pas les tâches priorisées, c'est en fait assez simple si vous utilisez Redis. Pour obtenir la tâche compte:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER llen QUEUE_NAME

mais, tâches priorisées utilisez une clé différente dans redis , de sorte que le tableau complet est légèrement plus compliqué. L'image complète est que vous devez interroger redis pour chaque priorité de tâche. En python (et du projet Flower), cela ressemble à:

PRIORITY_SEP = '\x06\x16'
DEFAULT_PRIORITY_STEPS = [0, 3, 6, 9]


def make_queue_name_for_pri(queue, pri):
    """Make a queue name for redis

    Celery uses PRIORITY_SEP to separate different priorities of tasks into
    different queues in Redis. Each queue-priority combination becomes a key in
    redis with names like:

     - batch1\x06\x163 <-- P3 queue named batch1

    There's more information about this in Github, but it doesn't look like it 
    will change any time soon:

      - https://github.com/celery/kombu/issues/422

    In that ticket the code below, from the Flower project, is referenced:

      - https://github.com/mher/flower/blob/master/flower/utils/broker.py#L135

    :param queue: The name of the queue to make a name for.
    :param pri: The priority to make a name with.
    :return: A name for the queue-priority pair.
    """
    if pri not in DEFAULT_PRIORITY_STEPS:
        raise ValueError('Priority not in priority steps')
    return '{0}{1}{2}'.format(*((queue, PRIORITY_SEP, pri) if pri else
                                (queue, '', '')))


def get_queue_length(queue_name='celery'):
    """Get the number of tasks in a celery queue.

    :param queue_name: The name of the queue you want to inspect.
    :return: the number of items in the queue.
    """
    priority_names = [make_queue_name_for_pri(queue_name, pri) for pri in
                      DEFAULT_PRIORITY_STEPS]
    r = redis.StrictRedis(
        host=settings.REDIS_HOST,
        port=settings.REDIS_PORT,
        db=settings.REDIS_DATABASES['CELERY'],
    )
    return sum([r.llen(x) for x in priority_names])

si vous voulez obtenir une tâche réelle, vous pouvez utiliser quelque chose comme:

redis-cli -h HOST -p PORT -n DATABASE_NUMBER lrange QUEUE_NAME 0 -1

de là, vous devrez désérialiser la liste retournée. Dans mon cas, j'ai pu accomplir ceci avec quelque chose comme:

r = redis.StrictRedis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=settings.REDIS_DATABASES['CELERY'],
)
l = r.lrange('celery', 0, -1)
pickle.loads(base64.decodestring(json.loads(l[0])['body']))

juste être averti que la desérialisation peut prendre un moment, et vous aurez besoin d'ajuster les commandes ci-dessus pour travailler avec diverses priorités.

10
répondu mlissner 2017-05-11 18:14:34

le module celery inspect semble ne connaître les tâches que du point de vue des travailleurs. Si vous voulez voir les messages qui sont dans la file d'attente (encore à être tiré par les travailleurs) je suggère d'utiliser pyrabbit , qui peut interfacer avec l'api http rabbitmq pour récupérer toutes sortes d'informations à partir de la file d'attente.

Un exemple peut être trouvé ici: Récupérer longueur de file d'attente avec le Céleri (RabbitMQ, Django)

4
répondu Paul in 't Hout 2017-05-23 10:31:30

je pense que la seule façon d'obtenir les tâches en attente est de garder une liste de tâches que vous avez commencé et laisser la tâche supprimer de la liste quand il est démarré.

avec rabbitmqctl et list_queues vous pouvez avoir un aperçu du nombre de tâches qui attendent, mais pas les tâches elles-mêmes: http://www.rabbitmq.com/man/rabbitmqctl.1.man.html

si ce que vous voulez inclut la tâche en cours de traitement, mais n'est pas encore terminé, vous pouvez garder une liste des tâches et vérifier leurs états:

from tasks import add
result = add.delay(4, 4)

result.ready() # True if finished

ou vous laissez Celery stocker les résultats avec CELERY_RESULT_BACKEND et vérifiez quelles tâches ne sont pas là.

3
répondu Sebastian Blask 2011-04-13 10:36:21

Un copier-coller de la solution pour le Redis avec la sérialisation json:

def get_celery_queue_items(queue_name):
    import base64
    import json  

    # Get a configured instance of a celery app:
    from yourproject.celery import app as celery_app

    with celery_app.pool.acquire(block=True) as conn:
        tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
        decoded_tasks = []

    for task in tasks:
        j = json.loads(task)
        body = json.loads(base64.b64decode(j['body']))
        decoded_tasks.append(body)

    return decoded_tasks

ça marche avec Django. N'oubliez pas de changer yourproject.celery .

3
répondu Max Malysh 2018-06-08 14:22:02

pour autant que je sache, Celery ne fournit pas D'API pour examiner les tâches qui attendent dans la file d'attente. C'est courtier spécifique. Si vous utilisez Redis comme courtier pour un exemple, alors examiner les tâches qui attendent dans la file celery (par défaut) est aussi simple que:

  1. se connecter à la base de données service broker
  2. les éléments de la liste dans le celery liste (LRANGE de commande pour un exemple)

Gardez à l'esprit que ces sont des tâches en attente d'être choisis par les travailleurs disponibles. Votre cluster peut avoir certaines tâches en cours d'exécution - celles-ci ne seront pas dans cette liste car elles ont déjà été sélectionnées.

2
répondu DejanLekic 2018-05-04 08:48:28

j'en suis arrivé à la conclusion que la meilleure façon d'obtenir le nombre d'emplois dans une file d'attente est d'utiliser rabbitmqctl comme cela a été suggéré plusieurs fois ici. Pour permettre à n'importe quel utilisateur choisi d'exécuter la commande avec sudo j'ai suivi les instructions ici (Je n'ai pas édité la partie profil car je ne me dérange pas dactylographier dans sudo avant la commande.)

j'ai aussi saisi l'extrait de jamesc grep et cut et je l'ai enveloppé dans des appels de sous-processus.

from subprocess import Popen, PIPE
p1 = Popen(["sudo", "rabbitmqctl", "list_queues", "-p", "[name of your virtula host"], stdout=PIPE)
p2 = Popen(["grep", "-e", "^celery\s"], stdin=p1.stdout, stdout=PIPE)
p3 = Popen(["cut", "-f2"], stdin=p2.stdout, stdout=PIPE)
p1.stdout.close()
p2.stdout.close()
print("number of jobs on queue: %i" % int(p3.communicate()[0]))
1
répondu Peter Shannon 2017-11-16 06:11:29