RabbitMQ / AMQP: file d'attente unique, plusieurs consommateurs pour le même message?

Je commence tout juste à utiliser RabbitMQ et AMQP en général.

  • j'ai une file d'attente de messages
  • j'ai plusieurs consommateurs, qui je voudrais faire des choses différentes avec la même message.

La Plupart de la documentation RabbitMQ semble être axée sur le Round-robin, c'est-à-dire où un seul message est consommé par un seul consommateur, la charge étant répartie entre chaque consommateur. C'est bien le comportement dont je suis témoin.

Un exemple: le producteur a une seule file d'attente et envoie des messages toutes les 2 secondes:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

Et voici un consommateur:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

Si je démarre le consommateur deux fois, je peux voir que chaque consommateur consomme des messages alternatifs dans un comportement Round-robin. Par exemple, je vais voir les messages 1, 3, 5 dans un terminal, 2, 4, 6 dans l'autre.

Ma question Est:

  • Puis-je demander à chaque consommateur de recevoir les mêmes messages? Ie, les deux consommateurs reçoivent un message 1, 2, 3, 4, 5, 6? Comment cela s'appelle-t-il dans AMQP/RabbitMQ parler? Comment est-il configuré?

  • Est-ce souvent fait? Devrais-je juste que l'échange achemine le message dans deux files d'attente distinctes, avec un seul consommateur, à la place?

92
demandé sur mikemaccana 2012-05-16 18:43:08

10 réponses

Puis-je demander à chaque consommateur de recevoir les mêmes messages? Ie, les deux consommateurs reçoivent un message 1, 2, 3, 4, 5, 6? Qu'est-ce que cela s'appelle dans AMQP / RabbitMQ speak? Comment est-il configuré?

Non, pas si les consommateurs sont sur la même file d'attente. À partir du Guide AMQP Concepts de RabbitMQ:

Il est important de comprendre que, dans AMQP 0-9-1, les messages sont équilibrés entre les consommateurs.

Cela semble impliquer que comportement Round-robin dans une file d'attente est un donné, et non configurable. C'est-à-dire que des files d'attente séparées sont nécessaires pour que le même ID de message soit géré par plusieurs consommateurs.

Est-ce souvent fait? Devrais-je juste que l'échange achemine le message dans deux files d'attente distinctes, avec un seul consommateur, à la place?

Non ce n'est pas le cas, une file d'attente unique/plusieurs consommateurs avec chaque consommateur gérant le même ID de message n'est pas possible. Avoir la route d'échange sur laquelle le message est placé dans deux files d'attente distinctes est en effet mieux.

Comme je ne nécessite pas de routage trop complexe, un échange de fanout gérera bien cela. Je ne me suis pas trop concentré sur les échanges plus tôt car node-amqp a le concept d'un "échange par défaut" vous permettant de publier des messages directement sur une connexion, mais la plupart des messages AMQP sont publiés sur un échange spécifique.

Voici mon échange de fanout, à la fois l'envoi et la réception:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   

    var sendMessage = function(exchange, payload) {
      console.log('about to publish')
      var encoded_payload = JSON.stringify(payload);
      exchange.publish('', encoded_payload, {})
    }

    // Recieve messages
    connection.queue("my_queue_name", function(queue){
      console.log('Created queue')
      queue.bind(exchange, ''); 
      queue.subscribe(function (message) {
        console.log('subscribed to queue')
        var encoded_payload = unescape(message.data)
        var payload = JSON.parse(encoded_payload)
        console.log('Recieved a message:')
        console.log(payload)
      })
    })

    setInterval( function() {    
      var test_message = 'TEST '+count
      sendMessage(exchange, test_message)  
      count += 1;
    }, 2000) 
 })
})
78
répondu mikemaccana 2015-11-01 01:51:17

Il suffit de lire le tutorielrabbitmq . Vous publiez le message dans exchange, pas dans la file d'attente; il est ensuite acheminé vers les files d'attente appropriées. Dans votre cas, vous devez lier une file d'attente séparée pour chaque consommateur. De cette façon, ils peuvent consommer des messages complètement indépendamment.

12
répondu driushkin 2012-05-16 15:11:01

Les deux dernières réponses sont presque correctes - j'ai des tonnes d'applications qui génèrent des messages qui doivent se retrouver avec différents consommateurs, donc le processus est très simple.

Si vous souhaitez que plusieurs consommateurs reçoivent le même message, procédez comme suit.

Créez plusieurs files d'attente, une pour chaque application qui doit recevoir le message, dans chaque propriété de file d'attente, "lier" une balise de routage avec l'amq.l'échange direct. Changez votre application de publication pour l'Envoyer à amq.diriger et utiliser la balise de routage (pas une file d'attente). AMQP copiera ensuite le message dans chaque file d'attente avec la même liaison. Fonctionne comme un charme :)

Exemple: disons que j'ai une chaîne JSON que je génère, je la publie dans " amq.direct "exchange en utilisant la balise de routage "new-sales-order", j'ai une file d'attente pour mon application order_printer qui imprime la commande, j'ai une file d'attente pour mon système de facturation qui enverra une copie de la commande et facturera le client et j'ai un système d'archive web où j'archive les commandes pour des raisons historiques/de conformité et j'ai une interface Web client où les commandes sont suivies à mesure que d'autres informations arrivent sur une commande.

Donc mes Files d'attente sont: order_printer, order_billing, order_archive et order_tracking Tous ont la balise de liaison "new-sales-order" liée à eux, tous les 4 obtiendront les données JSON.

C'est un moyen idéal d'envoyer des données sans que l'application de publication connaisse ou se soucie des applications de réception.

10
répondu z900collector 2015-09-14 01:17:02

Le modèle d'envoi est une relation un-à-un. Si vous voulez "envoyer" à plus d'un récepteur, vous devriez utiliser le modèle pub/sub. Voir http://www.rabbitmq.com/tutorials/tutorial-three-python.html pour plus de détails.

5
répondu Peter Ritchie 2012-12-27 21:50:16

Oui, chaque consommateur peut recevoir les mêmes messages. jetez un oeil à http://www.rabbitmq.com/tutorials/tutorial-three-python.html http://www.rabbitmq.com/tutorials/tutorial-four-python.html http://www.rabbitmq.com/tutorials/tutorial-five-python.html

Pour différentes façons d'acheminer les messages. Je sais qu'ils sont pour Python et java, mais il est bon de comprendre les principes, de décider ce que vous faites et de trouver comment le faire en JS. Ses sons comme vous voulez faire un simple sortance (tutoriel 3), qui envoie les messages à toutes les files d'attente reliée à l'échange.

La différence avec ce que vous faites et ce que vous voulez faire est essentiellement que vous allez configurer et échanger ou taper fanout. Fanout excahnges envoie tous les messages à toutes les files d'attente connectées. Chaque file d'attente auront un consommateur qui aura accès à tous les messages séparément.

Oui c'est couramment fait, c'est L'une des caractéristiques de L'AMPQ.

4
répondu robthewolf 2012-05-16 15:08:40

RabbitMQ / AMQP: file d'attente unique, plusieurs consommateurs pour le même message et actualisation de la page.

rabbit.on('ready', function () {    });
    sockjs_chat.on('connection', function (conn) {

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message.replace(/\r/g, '').replace(/\n/g, ''));

                if (obj.header == "register") {

                    // Connect to RabbitMQ
                    try {
                        conn.exchange = rabbit.exchange(exchange, { type: 'topic',
                            autoDelete: false,
                            durable: false,
                            exclusive: false,
                            confirm: true
                        });

                        conn.q = rabbit.queue('my-queue-'+obj.agentID, {
                            durable: false,
                            autoDelete: false,
                            exclusive: false
                        }, function () {
                            conn.channel = 'my-queue-'+obj.agentID;
                            conn.q.bind(conn.exchange, conn.channel);

                            conn.q.subscribe(function (message) {
                                console.log("[MSG] ---> " + JSON.stringify(message));
                                conn.write(JSON.stringify(message) + "\n");
                            }).addCallback(function(ok) {
                                ctag[conn.channel] = ok.consumerTag; });
                        });
                    } catch (err) {
                        console.log("Could not create connection to RabbitMQ. \nStack trace -->" + err.stack);
                    }

                } else if (obj.header == "typing") {

                    var reply = {
                        type: 'chatMsg',
                        msg: utils.escp(obj.msga),
                        visitorNick: obj.channel,
                        customField1: '',
                        time: utils.getDateTime(),
                        channel: obj.channel
                    };

                    conn.exchange.publish('my-queue-'+obj.agentID, reply);
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        // When the visitor closes or reloads a page we need to unbind from RabbitMQ?
        conn.on('close', function () {
            try {

                // Close the socket
                conn.close();

                // Close RabbitMQ           
               conn.q.unsubscribe(ctag[conn.channel]);

            } catch (er) {
                console.log(":::::::: EXCEPTION SOCKJS (ON-CLOSE) ::::::::>>>>>>> " + er.stack);
            }
        });
    });
3
répondu durai 2014-06-02 06:16:52

Pour obtenir le comportement que vous voulez, demandez simplement à chaque consommateur de consommer à partir de sa propre file d'attente. Vous devrez utiliser un type d'échange Non direct (sujet, en-tête, fanout) afin d'obtenir le message à toutes les files d'attente à la fois.

1
répondu Skylos 2012-11-01 20:11:57

Comme j'évalue votre cas est:

  • J'ai une file d'attente de messages (Votre source pour recevoir des messages, permet de le nommer q111)

  • J'ai plusieurs consommateurs, dont je voudrais faire des choses différentes avec le même message.

Votre problème ici est que pendant que 3 messages sont reçus par cette file d'attente, le message 1 est consommé par un consommateur A, les autres consommateurs B et C consomment les messages 2 et 3. Où comme vous avez besoin d'une configuration où rabbitmq passe sur le même copies de tous ces trois messages(1,2,3) aux trois consommateurs connectés (A,B,C) simultanément.

Alors que de nombreuses configurations peuvent être faites pour y parvenir, un moyen simple est d'utiliser le concept en deux étapes suivant:

  • Utilisez un rabbitmq-shovel dynamique pour récupérer les messages de la file d'attente souhaitée(q111) et les publier dans un échange de fanout (échange exclusivement créé et dédié à cet effet).
  • reconfigurez maintenant vos consommateurs A, B & C (qui écoutaient file d'attente(Q111)) pour écouter cet échange de Fanout directement en utilisant une file d'attente exclusive et anonyme pour chaque consommateur.

Remarque: lors de l'utilisation de ce concept, ne consommez pas directement à partir de la file d'attente source(q111), car les messages déjà consommés ne seront pas envoyés à votre échange de Fanout.

Si vous pensez que cela ne satisfait pas votre exigence exacte... n'hésitez pas à poster vos suggestions :-)

1
répondu Anshuman Banerjee 2014-08-12 06:59:08

Si vous utilisez la bibliothèque amqplib comme je suis, ils ont un exemple pratique d'une implémentation du tutoriel Publish/Subscribe RabbitMQ que vous pourriez trouver pratique.

0
répondu brettjonesdev 2015-03-02 21:46:55

Je pense que vous devriez vérifier l'envoi de vos messages en utilisant le fan-out échangeur. De cette façon, vous recevrez le même message pour différents consommateurs, sous la table RabbitMQ crée différentes files d'attente pour chacun de ces nouveaux consommateurs/abonnés.

C'est le lien pour voir l'exemple de tutoriel en javascript https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

0
répondu Manuel Alejandro Diaz Serret 2016-11-04 18:46:19