Effectuer des tâches de longue durée à pika / RabbitMQ
nous essayons de mettre en place un système de base de file d'attente dirigée où un producteur générera plusieurs tâches et un ou plusieurs consommateurs saisiront une tâche à la fois, la traiteront et reconnaîtront le message.
le problème est que le traitement peut prendre de 10 à 20 minutes, et nous ne répondons pas aux messages à ce moment-là, ce qui provoque la déconnexion du serveur.
voici un pseudo code pour notre consommateur:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
une fois la première tâche terminée, une exception est jeté quelque part à l'intérieur de la connexion de blocage, se plaindre que la prise a été réinitialisée. De plus, les registres de RabbitMQ montrent que le consommateur a été déconnecté parce qu'il n'a pas répondu à temps (pourquoi il réinitialise la connexion plutôt que d'envoyer un aileron est étrange, mais nous ne nous en soucierons pas).
nous avons beaucoup cherché parce que nous pensions que C'était le cas normal D'utilisation de RabbitMQ (ayant beaucoup de tâches de longue durée qui devraient être réparties entre de nombreux consommateurs), mais il semble que personne d'autre n'avait vraiment ce problème. Enfin, nous sommes tombés sur un fil où il était recommandé d'utiliser les battements de coeur et de frayer le long_running_task()
dans un thread séparé.
alors le code est devenu:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
Et cela semble fonctionner, mais c'est très salissant. Sommes-nous sûrs que l' ch
l'objet est-il sûr? En outre, imaginez que long_running_task()
utilise ce paramètre de connexion pour ajouter une tâche à une nouvelle file d'attente (i.e. la première partie de ce long processus est terminée, envoyons la tâche sur pour la deuxième partie). Donc, le thread utilise le connection
objet. C'est que "thread-safe"?
plus précisément, quelle est la meilleure façon de procéder? J'ai l'impression que c'est très bordélique et peut-être pas sûr du fil, donc peut-être qu'on ne le fait pas bien. Merci!
4 réponses
pour l'instant, votre meilleur pari est d'éteindre les battements de cœur, ce qui empêchera RabbitMQ de fermer la connexion si vous bloquez trop longtemps. Je suis en train d'expérimenter avec la gestion de la connexion principale de pika et la boucle IO tournant dans un fil d'arrière-plan mais il n'est pas assez stable pour libérer.
je rencontre le même problème que vous.
Ma solution est la suivante:
- ture au large de la pulsation sur le côté serveur
- Evaluer le temps maximum que la tâche peut prendre
- réglez le temps d'arrêt du rythme cardiaque du client à l'heure obtenue à partir de l'étape 2
Pourquoi cela?
Comme je l'ai tester avec les cas suivants:
cas d'un- serveur de battement de coeur de tour, des années 1800
- client unset
j'obtiens toujours l'erreur lors de la tâche en cours d'exécution pendant un temps très long -- >1800
de cas les deux- désactiver le serveur de battement de coeur
- arrêt du rythme cardiaque du client
il n'y a pas d'erreur côté client, sauf un problème--lorsque le client tombe en panne(mon os redémarre sur certains défauts), la connexion tcp peut encore être vue sur le plugin de gestion Rabbitmq. Et il est source de confusion.
cas de trois- désactiver le serveur battement de coeur
- activez le rythme cardiaque du client, réglez-le à la durée d'exécution maximale prévue
dans ce cas, je peux dynamiser chaque coup de chaleur sur chaque client. En fait, j'ai mis des battements de coeur sur les machines qui se sont écrasées fréquemment.De plus, je peux voir la machine hors ligne à travers le plugin de gestion Rabbitmq.
Environnement
OS: centos x86_64
pika: 0.9.13
rabbitmq: 3.3.1
ne pas désactiver les battements de cœur.
La meilleure solution consiste à exécuter la tâche dans un thread séparé et , définissez prefetch_count
1
de sorte que le consommateur ne reçoit qu'un seul message non reconnu
en utilisant quelque chose comme ceci channel.basic_qos(prefetch_count=1)
- Vous pouvez appel périodique
connection.process_data_events()
dans votrelong_running_task(connection)
, cette fonction envoie heartbeat au serveur quand il est appelé, et garde le client pika loin de la fermeture. - définissez la valeur heartbeat supérieure à call
connection.process_data_events()
période dans votre pikaBlockingConnection
.