RabbitMQ C # pilote cesse de recevoir des messages

Avez-vous des conseils pour déterminer quand un problème d'abonnement s'est produit afin que je puisse me reconnecter?

Mon service utilise RabbitMQ.Client.MessagePatterns.Abonnement pour son abonnement. Après un certain temps, mon client cesse silencieusement de recevoir des messages. Je soupçonne des problèmes de réseau car notre connexion VPN n'est pas la plus fiable.

J'ai lu les documents pendant un certain temps à la recherche d'une clé pour savoir quand cet abonnement pourrait être rompu en raison d'un problème de réseau sans grand-chose chance. J'ai essayé de vérifier que la connexion et le canal sont toujours ouverts, mais il semble toujours signaler qu'il est toujours ouvert.

Les messages qu'il traite fonctionnent assez bien et sont reconnus dans la file d'attente, donc je ne pense pas que ce soit un problème avec le "ack".

Je suis sûr que je dois juste manquer quelque chose de simple, mais je ne l'ai pas encore trouvé.

public void Run(string brokerUri, Action<byte[]> handler)
{
    log.Debug("Connecting to broker: {0}".Fill(brokerUri));
    ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };

    using (IConnection connection = factory.CreateConnection())
    {
        using (IModel channel = connection.CreateModel())
        {
            channel.QueueDeclare(queueName, true, false, false, null);

            using (Subscription subscription = new Subscription(channel, queueName, false))
            {
                while (!Cancelled)
                {
                    BasicDeliverEventArgs args;

                    if (!channel.IsOpen)
                    {
                        log.Error("The channel is no longer open, but we are still trying to process messages.");
                        throw new InvalidOperationException("Channel is closed.");
                    }
                    else if (!connection.IsOpen)
                    {
                        log.Error("The connection is no longer open, but we are still trying to process message.");
                        throw new InvalidOperationException("Connection is closed.");
                    }

                    bool gotMessage = subscription.Next(250, out args);

                    if (gotMessage)
                    {
                        log.Debug("Received message");
                        try
                        {
                            handler(args.Body);
                        }
                        catch (Exception e)
                        {
                            log.Debug("Exception caught while processing message. Will be bubbled up.", e);
                            throw;
                        }

                        log.Debug("Acknowledging message completion");
                        subscription.Ack(args);
                    }
                }
            }
        }
    }
}

Mise à jour:

J'ai simulé une panne de réseau en exécutant le serveur dans une machine virtuelle et je fais obtenir un exception (RabbitMQ.Client.Exception.OperationInterruptedException: L'opération AMQP a été interrompue) lorsque je casse la connexion assez longtemps, ce n'est peut-être pas un problème de réseau. Maintenant, je ne sais pas ce que ce serait, mais il échoue après seulement quelques heures de course.

28
demandé sur Aaron Milner 2012-09-19 20:42:49

1 réponses

EDIT: puisque je suis en train de faire des upvotes à ce sujet, je devrais souligner que le client. Net RabbitMQ a maintenant cette fonctionnalité intégrée: https://www.rabbitmq.com/dotnet-api-guide.html#connection-recovery

Idéalement, vous devriez pouvoir l'utiliser et éviter d'implémenter manuellement la logique de reconnexion.


J'ai récemment dû mettre en œuvre presque la même chose. D'après ce que je peux dire, la plupart des informations disponibles sur RabbitMQ supposent que votre réseau est très fiable ou que vous exécutez un courtier RabbitMQ sur la même machine que tout client envoyant ou recevant des messages, permettant à Rabbit de traiter tous les problèmes de connexion.

Il n'est vraiment pas si difficile de configurer le client Rabbit pour qu'il soit robuste contre les connexions supprimées, mais il y a quelques idiosyncrasies que vous devez gérer.

La première chose que vous devez faire activer le rythme cardiaque:

ConnectionFactory factory = new ConnectionFactory() 
{
  Uri = brokerUri,
  RequestedHeartbeat = 30,
}; 

Définir le "RequestedHeartbeat" à 30 fera vérifier le client tous les 30 secondes si la connexion est toujours active. Sans cela allumé, l'abonné de message restera assis là attendant heureusement qu'un autre message arrive sans la moindre idée que sa connexion a mal tourné.

Activer le rythme cardiaque permet également au serveur de vérifier si la connexion est toujours en place, ce qui peut être très important. Si une connexion tourne mal après qu'un message a été ramassé par l'abonné mais avant qu'il ait été reconnu, le serveur suppose simplement que le client prend un longtemps, et le message est "coincé" sur la connexion morte jusqu'à ce qu'il soit fermé. Avec le rythme cardiaque activé, le serveur reconnaîtra quand la connexion va mal et la fermera, remettant le message dans la file d'attente afin qu'un autre abonné puisse le gérer. Sans le rythme cardiaque, j'ai dû entrer manuellement et fermer la connexion dans L'interface utilisateur de Rabbit management afin que le message bloqué puisse être transmis à un abonné.

Deuxièmement, vous devrez gérer OperationInterruptedException. Comme vous avez remarqué, c'est l'exception du Lapin client jeter quand il détecte la connexion a été interrompue. Si IModel.QueueDeclare() est appelé lorsque la connexion a été interrompue, c'est l'exception que vous obtiendrez. Gérez cette exception en éliminant votre abonnement, votre canal et votre connexion et en créant de nouveaux.

Enfin, vous devrez gérer ce que fait votre consommateur lorsque vous essayez de consommer des messages provenant d'une connexion fermée. Malheureusement, chaque façon différente de consommer des messages d'un la file D'attente dans le client Rabbit semble réagir différemment. QueueingBasicConsumer jette EndOfStreamException si vous appelez QueueingBasicConsumer.Queue.Dequeue sur une connexion fermée. EventingBasicConsumer ne fait rien, car il attend juste un message. D'après ce que je peux dire en l'essayant, la classe Subscription que vous utilisez semble renvoyer true d'un appel à Subscription.Next, mais la valeur de args est null. Encore une fois, gérez-le en éliminant votre connexion, votre canal et votre abonnement et en les recréant.

La valeur de connection.IsOpen sera mise à jour à False lorsque la connexion échoue avec le rythme cardiaque activé, vous pouvez donc le vérifier si vous le souhaitez. Cependant, puisque le rythme cardiaque s'exécute sur un thread séparé, vous devrez toujours gérer le cas où la connexion est ouverte lorsque vous la Vérifiez, mais se ferme avant que subscription.Next() ne soit appelé.

Une dernière chose à surveiller est IConnection.Dispose(). Cet appel lancera un EndOfStreamException Si vous appelez dispose après la fermeture de la connexion. Cela me semble être un bug, et je n'aime pas ne pas appeler dispose sur un objet IDisposable, donc je appelez-le et avalez l'exception.

Mettre tout cela ensemble dans un exemple rapide et sale:

public bool Cancelled { get; set; }

IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;

public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
    ConnectionFactory factory = new ConnectionFactory() 
    {
        Uri = brokerUri,
        RequestedHeartbeat = 30,
    };

    while (!Cancelled)
    {               
        try
        {
            if(_subscription == null)
            {
                try
                {
                    _connection = factory.CreateConnection();
                }
                catch(BrokerUnreachableException)
                {
                    //You probably want to log the error and cancel after N tries, 
                    //otherwise start the loop over to try to connect again after a second or so.
                    continue;
                }

                _channel = _connection.CreateModel();
                _channel.QueueDeclare(queueName, true, false, false, null);
                _subscription = new Subscription(_channel, queueName, false);
            }

            BasicDeliverEventArgs args;
            bool gotMessage = _subscription.Next(250, out args);
            if (gotMessage)
            {
                if(args == null)
                {
                    //This means the connection is closed.
                    DisposeAllConnectionObjects();
                    continue;
                }

                handler(args.Body);
                _subscription.Ack(args);
            }
        }
        catch(OperationInterruptedException ex)
        {
            DisposeAllConnectionObjects();
        }
    }
    DisposeAllConnectionObjects();
}

private void DisposeAllConnectionObjects()
{
    if(_subscription != null)
    {
        //IDisposable is implemented explicitly for some reason.
        ((IDisposable)_subscription).Dispose();
        _subscription = null;
    }

    if(_channel != null)
    {
        _channel.Dispose();
        _channel = null;
    }

    if(_connection != null)
    {
        try
        {
            _connection.Dispose();
        }
        catch(EndOfStreamException) 
        {
        }
        _connection = null;
    }
}
50
répondu Brian Zell 2016-12-05 15:18:29