Comment créer une file d'attente retardée à RabbitMQ?
Quelle est la façon la plus simple de créer une file d'attente (ou de stationnement) avec Python, Pika et RabbitMQ? J'ai vu un similaire questions , mais aucun pour Python.
je trouve cette idée utile lors de la conception d'applications, car elle nous permet d'étouffer les messages qui doivent être re-mis en file d'attente.
Il ya toujours la possibilité que vous recevrez plus de messages que vous ne pouvez gérer, peut-être le serveur HTTP est lent, ou le la base de données est sous trop de stress.
j'ai également trouvé très utile lorsque quelque chose a mal tourné dans les scénarios où il ya une tolérance zéro à la perte de messages, et tout en re-queue messages qui ne pouvaient pas être traitées peut résoudre cela. Il peut aussi causer des problèmes où le message sera mis en file d'attente, encore et encore. Potentiellement causer des problèmes de performance, et log spam.
5 réponses
j'ai trouvé cela extrêmement utile lors du développement de mes applications. Comme il vous donne une alternative à simplement re-queue vos messages. Cela peut facilement réduire la complexité de votre code, et est l'une des nombreuses fonctionnalités cachées puissantes de RabbitMQ.
Étapes
nous devons d'abord mettre en place deux canaux de base, l'un pour la file d'attente principale, et l'autre pour la file d'attente de retard. Dans mon exemple à la fin, j'inclus un couple d'autres des indicateurs qui ne sont pas nécessaires, mais qui rendent le code plus fiable, tels que confirm delivery
, delivery_mode
et durable
. Vous pouvez trouver plus d'informations sur ceux-ci dans le RabbitMQ manuel .
après avoir configuré les canaux, nous ajoutons une liaison au canal principal que nous pouvons utiliser pour envoyer des messages du canal delay à notre file d'attente principale.
channel.queue_bind(exchange='amq.direct',
queue='hello')
ensuite, nous devons configurer notre canal de retard pour transmettre des messages à la file d'attente principale une fois qu'ils sont expirés.
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 5000,
'x-dead-letter-exchange' : 'amq.direct',
'x-dead-letter-routing-key' : 'hello'
})
-
x-message-ttl (Message - time to Live)
normalement utilisé pour supprimer automatiquement les anciens messages la file d'attente après une durée spécifique, mais en ajoutant deux arguments optionnels nous peut changer ce comportement, et à la place avoir ce paramètre déterminer en millisecondes, combien de temps les messages resteront dans la file d'attente.
-
cette variable nous permet de transférer le message dans une file d'attente différente une fois qu'ils ont expiré, au lieu du comportement par défaut de supprimer complètement.
-
cette variable détermine l'échange utilisé pour transférer le message de hello_delay bonjour la file d'attente.
la Publication sur le retard de la file d'attente
lorsque nous avons terminé la configuration de tous les paramètres Pika de base, il vous suffit d'envoyer un message à la file d'attente en utilisant basic publish.
delay_channel.basic_publish(exchange='',
routing_key='hello_delay',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
une fois que vous avez exécuté le script, vous devriez voir les files d'attente suivantes créées dans votre module de gestion RabbitMQ.
exemple.
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_delivery()
channel.queue_declare(queue='hello', durable=True)
# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue_bind(exchange='amq.direct',
queue='hello')
# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_delivery()
# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue_declare(queue='hello_delay', durable=True, arguments={
'x-message-ttl' : 5000, # Delay until the message is transferred in milliseconds.
'x-dead-letter-exchange' : 'amq.direct', # Exchange used to transfer the message from A to B.
'x-dead-letter-routing-key' : 'hello' # Name of the queue we want the message transferred to.
})
delay_channel.basic_publish(exchange='',
routing_key='hello_delay',
body="test",
properties=pika.BasicProperties(delivery_mode=2))
print " [x] Sent"
vous pouvez utiliser le plugin officiel de RabbitMQ: x-delayed-message .
tout D'abord, téléchargez et copiez le fichier ez dans Your_rabbitmq_root_path/plugins
deuxièmement, activez le plugin (il n'est pas nécessaire de redémarrer le serveur):
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
enfin, publiez votre message avec des en-têtes "X-delay" comme:
headers.put("x-delay", 5000);
Avis:
cela ne garantit pas la sécurité de votre message, car si votre message expire juste pendant le temps d'arrêt de votre serveur rabbitmq, malheureusement le message est perdu. Donc être prudent lorsque vous utilisez ce système.
en Profiter et plus d'infos dans rabbitmq-retard-message-échange
FYI, comment faire cela au printemps 3.2.x.
<rabbit:queue name="delayQueue" durable="true" queue-arguments="delayQueueArguments"/>
<rabbit:queue-arguments id="delayQueueArguments">
<entry key="x-message-ttl">
<value type="java.lang.Long">10000</value>
</entry>
<entry key="x-dead-letter-exchange" value="finalDestinationTopic"/>
<entry key="x-dead-letter-routing-key" value="finalDestinationQueue"/>
</rabbit:queue-arguments>
<rabbit:fanout-exchange name="finalDestinationTopic">
<rabbit:bindings>
<rabbit:binding queue="finalDestinationQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
NodeJS mise en œuvre.
Tout est assez clair dans le code. Espérons qu'il permettra de sauver quelqu'un.
var ch = channel;
ch.assertExchange("my_intermediate_exchange", 'fanout', {durable: false});
ch.assertExchange("my_final_delayed_exchange", 'fanout', {durable: false});
// setup intermediate queue which will never be listened.
// all messages are TTLed so when they are "dead", they come to another exchange
ch.assertQueue("my_intermediate_queue", {
deadLetterExchange: "my_final_delayed_exchange",
messageTtl: 5000, // 5sec
}, function (err, q) {
ch.bindQueue(q.queue, "my_intermediate_exchange", '');
});
ch.assertQueue("my_final_delayed_queue", {}, function (err, q) {
ch.bindQueue(q.queue, "my_final_delayed_exchange", '');
ch.consume(q.queue, function (msg) {
console.log("delayed - [x] %s", msg.content.toString());
}, {noAck: true});
});
Message dans la file D'attente lapin peut être retardé de deux façons - en utilisant la file TTL - utilisation du Message TTL Si tous les messages dans la file d'attente doivent être retardés pour une durée fixe, utilisez queue TTL. Si chaque message doit être retardé par un décalage horaire, utilisez le Message TTL. Je l'ai expliqué en utilisant python3 et le module pika. Pika basicproperties argument "expiration" en millisecondes doit être défini pour retarder le message dans la file d'attente. Après avoir défini le temps d'expiration, publiez le message dans une file d'attente delayed_queue ("not actual queue lorsque les consommateurs attendent de consommer"), une fois que le message dans delayed_queue expire, le message sera routé vers une file d'attente réelle en utilisant exchange 'amq.direct '
def delay_publish(self, messages, queue, headers=None, expiration=0):
"""
Connect to RabbitMQ and publish messages to the queue
Args:
queue (string): queue name
messages (list or single item): messages to publish to rabbit queue
expiration(int): TTL in milliseconds for message
"""
delay_queue = "".join([queue, "_delay"])
logging.info('Publishing To Queue: {queue}'.format(queue=delay_queue))
logging.info('Connecting to RabbitMQ: {host}'.format(
host=self.rabbit_host))
credentials = pika.PlainCredentials(
RABBIT_MQ_USER, RABBIT_MQ_PASS)
parameters = pika.ConnectionParameters(
rabbit_host, RABBIT_MQ_PORT,
RABBIT_MQ_VHOST, credentials, heartbeat_interval=0)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
channel.queue_bind(exchange='amq.direct',
queue=queue)
delay_channel = connection.channel()
delay_channel.queue_declare(queue=delay_queue, durable=True,
arguments={
'x-dead-letter-exchange': 'amq.direct',
'x-dead-letter-routing-key': queue
})
properties = pika.BasicProperties(
delivery_mode=2, headers=headers, expiration=str(expiration))
if type(messages) not in (list, tuple):
messages = [messages]
try:
for message in messages:
try:
json_data = json.dumps(message)
except Exception as err:
logging.error(
'Error Jsonify Payload: {err}, {payload}'.format(
err=err, payload=repr(message)), exc_info=True
)
if (type(message) is dict) and ('data' in message):
message['data'] = {}
message['error'] = 'Payload Invalid For JSON'
json_data = json.dumps(message)
else:
raise
try:
delay_channel.basic_publish(
exchange='', routing_key=delay_queue,
body=json_data, properties=properties)
except Exception as err:
logging.error(
'Error Publishing Data: {err}, {payload}'.format(
err=err, payload=json_data), exc_info=True
)
raise
except Exception:
raise
finally:
logging.info(
'Done Publishing. Closing Connection to {queue}'.format(
queue=delay_queue
)
)
connection.close()