ZeroMQ PUB/SUB Modèle Multi-Thread Poller Annulation

j'ai deux applications, un serveur C++ et une interface utilisateur C# WPF. Le code C++ prend les requêtes (de n'importe où/n'importe qui) via un service de messagerie ZeroMQ [PUB/SUB]. J'utilise mon code C# pour le back testing et pour créer des "back tests" et les exécuter. Ces derniers peuvent être constitués de nombreux "tests unitaires" et chacun d'eux envoyant/recevant des milliers de messages du serveur C++.

actuellement, les tests de dos individuels fonctionnent bien et peuvent envoyer N tests unitaires chacun avec des milliers de demandes et capturer. Mon problème est l'architecture; quand j'envoie un autre test arrière (suivant le premier) j'ai un problème avec l'abonnement d'événement étant fait une deuxième fois en raison du fil de sondage n'étant pas annulé et disposé. Il en résulte une sortie erronée. Cela peut sembler une situation banale (c'est peut-être pour certains d'entre vous), mais l'annulation de ce scrutin Tâche sous ma configuration actuelle s'avère gênant. Un peu de code...

ma classe de courtier de message est simple et regarde comme

public class MessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    private Task pollingTask;
    private NetMQContext context;
    private PublisherSocket pubSocket;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    public MessageBroker()
    {
        this.source = new CancellationTokenSource();
        this.token = source.Token;

        StartPolling();
        context = NetMQContext.Create();
        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);
    }

    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    private void StartPolling()
    {
        pollerCancelled = new ManualResetEvent(false);
        pollingTask = Task.Run(() =>
        {
            try
            {
                using (var context = NetMQContext.Create())
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    while (true)
                    {
                        buffer = subSocket.Receive();
                        MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                        if (this.token.IsCancellationRequested)
                            this.token.ThrowIfCancellationRequested();
                    }
                }
            }
            catch (OperationCanceledException)
            {
                pollerCancelled.Set();
            }
        }, this.token);
    }

    private void CancelPolling()
    {
        source.Cancel();
        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }

    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }
    public string PublisherAddress { get { return "tcp://127.X.X.X:6500"; } }
    public string SubscriberAddress { get { return "tcp://127.X.X.X:6501"; } }

    private bool disposed = false;

    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (this.pollingTask != null)
                {
                    CancelPolling();
                    if (this.pollingTask.Status == TaskStatus.RanToCompletion ||
                         this.pollingTask.Status == TaskStatus.Faulted ||
                         this.pollingTask.Status == TaskStatus.Canceled)
                    {
                        this.pollingTask.Dispose();
                        this.pollingTask = null;
                    }
                }
                if (this.context != null)
                {
                    this.context.Dispose();
                    this.context = null;
                }
                if (this.pubSocket != null)
                {
                    this.pubSocket.Dispose();
                    this.pubSocket = null;
                }
                if (this.source != null)
                {
                  this.source.Dispose();
                  this.source = null;
                }
            }
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    ~MessageBroker()
    {
        Dispose(false);
    }
}

l'utilisation du "moteur" d'essai arrière pour exécuter chaque essai arrière, construit d'abord un Dictionary contenant chaque Test (test unitaire) et les messages à envoyer à L'application C++ pour chaque test.

DispatchTests méthode, c'est ici

private void DispatchTests(ConcurrentDictionary<Test, List<Taurus.FeedMux>> feedMuxCollection)
{
    broker = new MessageBroker();
    broker.MessageRecieved = new Progress<Taurus.FeedMux>(OnMessageRecieved);
    testCompleted = new ManualResetEvent(false);

    try
    {
        // Loop through the tests. 
        foreach (var kvp in feedMuxCollection)
        {
            testCompleted.Reset();
            Test t = kvp.Key;
            t.Bets = new List<Taurus.Bet>();
            foreach (Taurus.FeedMux mux in kvp.Value)
            {
                token.ThrowIfCancellationRequested();
                broker.Dispatch(mux);
            }
            broker.Dispatch(new Taurus.FeedMux()
            {
                type = Taurus.FeedMux.Type.PING,
                ping = new Taurus.Ping() { event_id = t.EventID }
            });
            testCompleted.WaitOne(); // Wait until all messages are received for this test. 
        }
        testCompleted.Close();
    }
    finally
    {
        broker.Dispose(); // Dispose the broker.
    }
}

PING message à la fin, c'est pour dire au c++ que nous sommes finis. Nous forçons alors une attente, de sorte que le prochain test [unit] ne soit pas envoyé avant que tous les retours ne soient reçus du c++ code - nous faisons cela en utilisant un ManualResetEvent.

quand le c++ reçoit le message de PING, il renvoie le message directement. Nous traitons les messages reçus via OnMessageRecieved et le PING nous dit de mettre le ManualResetEvent.Set() afin que nous puissions poursuivre les tests de l'unité; "suivant S'il vous plaît"...

private async void OnMessageRecieved(Taurus.FeedMux mux)
{
    string errorMsg = String.Empty;
    if (mux.type == Taurus.FeedMux.Type.MSG)
    {
        // Do stuff.
    }
    else if (mux.type == Taurus.FeedMux.Type.PING)
    {
        // Do stuff.

        // We are finished reciving messages for this "unit test"
        testCompleted.Set(); 
    }
}

Mon problème est que, broker.Dispose() dans le a finalement ci-dessus n'est jamais atteint. J'apprécie que finalement les blocs qui sont exécutés sur les threads d'arrière-plan ne sont pas garantis pour obtenir exécuté.

le texte rayé ci-dessus était dû au fait que je m'amusais avec le code; j'arrêtais un thread parent avant que l'enfant n'ait terminé. Cependant, il y a encore des problèmes...

broker.Dispose() s'appelle correctement, et broker.Dispose() est appelé, dans cette méthode je tente d'annuler le fil de poller et dispose de la Task correctement pour éviter tout Abonnement multiple.

pour annuler le thread j'utilise le CancelPolling() méthode

private void CancelPolling()
{
    source.Cancel();
    pollerCancelled.WaitOne(); <- Blocks here waiting for cancellation.
    pollerCancelled.Close();
}

mais dans le StartPolling() méthode

while (true)
{
    buffer = subSocket.Receive();
    MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
    if (this.token.IsCancellationRequested)
        this.token.ThrowIfCancellationRequested();
}

ThrowIfCancellationRequested() n'est jamais appelé et le thread n'est jamais annulé, donc jamais correctement disposé. Le fil de poller est bloqué par le subSocket.Receive() méthode.

maintenant, il ne me semble pas clair comment atteindre ce que je veux, je dois invoquer le broker.Dispose()/PollerCancel() sur un fil autre que celui utilisé pour Poller les messages et certains comment forcer l'annulation. Abandon de Thread n'est pas ce que je veux entrer dans à tout coût.

Essentiellement, je veux disposer correctement des broker avant d'exécuter le prochain test arrière, comment gérer correctement ceci, séparer le sondage et l'exécuter dans un domaine D'Application séparé?

j'ai essayé, se débarrasser à l'intérieur de la OnMessageRecived gestionnaire, mais ce n'est clairement exécuté sur le même thread que le poller et n'est pas la façon de le faire, sans invoquer d'autres threads, il bloque.

Quelle est la meilleure façon d'atteindre ce que l' Je veux et est-il un modèle pour ce genre d'affaire que je peux suivre?

Merci pour votre temps.

17
demandé sur MoonKnight 2015-04-30 23:34:27

2 réponses

C'est comme ça que j'ai finalement contourné ça [bien que je sois ouvert à une meilleure solution!]

public class FeedMuxMessageBroker : IMessageBroker<Taurus.FeedMux>, IDisposable
{
    // Vars.
    private NetMQContext context;
    private PublisherSocket pubSocket;
    private Poller poller;

    private CancellationTokenSource source;
    private CancellationToken token;
    private ManualResetEvent pollerCancelled;

    /// <summary>
    /// Default ctor.
    /// </summary>
    public FeedMuxMessageBroker()
    {
        context = NetMQContext.Create();

        pubSocket = context.CreatePublisherSocket();
        pubSocket.Connect(PublisherAddress);

        pollerCancelled = new ManualResetEvent(false);
        source = new CancellationTokenSource();
        token = source.Token;
        StartPolling();
    }

    #region Methods.
    /// <summary>
    /// Send the mux message to listners.
    /// </summary>
    /// <param name="message">The message to dispatch.</param>
    public void Dispatch(Taurus.FeedMux message)
    {
        pubSocket.Send(message.ToByteArray<Taurus.FeedMux>());
    }

    /// <summary>
    /// Start polling for messages.
    /// </summary>
    private void StartPolling()
    {
        Task.Run(() =>
            {
                using (var subSocket = context.CreateSubscriberSocket())
                {
                    byte[] buffer = null;
                    subSocket.Options.ReceiveHighWatermark = 1000;
                    subSocket.Connect(SubscriberAddress);
                    subSocket.Subscribe(String.Empty);
                    subSocket.ReceiveReady += (s, a) =>
                    {
                        buffer = subSocket.Receive();
                        if (MessageRecieved != null)
                            MessageRecieved.Report(buffer.ToObject<Taurus.FeedMux>());
                    };

                    // Poll.
                    poller = new Poller();
                    poller.AddSocket(subSocket);
                    poller.PollTillCancelled();
                    token.ThrowIfCancellationRequested();
                }
            }, token).ContinueWith(ant => 
                {
                    pollerCancelled.Set();
                }, TaskContinuationOptions.OnlyOnCanceled);
    }

    /// <summary>
    /// Cancel polling to allow the broker to be disposed.
    /// </summary>
    private void CancelPolling()
    {
        source.Cancel();
        poller.Cancel();

        pollerCancelled.WaitOne();
        pollerCancelled.Close();
    }
    #endregion // Methods.

    #region Properties.
    /// <summary>
    /// Event that is raised when a message is recived. 
    /// </summary>
    public IProgress<Taurus.FeedMux> MessageRecieved { get; set; }

    /// <summary>
    /// The address to use for the publisher socket.
    /// </summary>
    public string PublisherAddress { get { return "tcp://127.0.0.1:6500"; } }

    /// <summary>
    /// The address to use for the subscriber socket.
    /// </summary>
    public string SubscriberAddress { get { return "tcp://127.0.0.1:6501"; } }
    #endregion // Properties.

    #region IDisposable Members.
    private bool disposed = false;

    /// <summary>
    /// Dispose managed resources.
    /// </summary>
    /// <param name="disposing">Is desposing.</param>
    protected virtual void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                CancelPolling();
                if (pubSocket != null)
                {
                    pubSocket.Disconnect(PublisherAddress);
                    pubSocket.Dispose();
                    pubSocket = null;
                }
                if (poller != null)
                {
                    poller.Dispose();
                    poller = null;
                }
                if (context != null)
                {
                    context.Terminate();
                    context.Dispose();
                    context = null;
                }
                if (source != null)
                {
                    source.Dispose();
                    source = null;
                }
            }

            // Shared cleanup logic.
            disposed = true;
        }
    }

    public void Dispose()
    {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    /// <summary>
    /// Finalizer.
    /// </summary>
    ~FeedMuxMessageBroker()
    {
        Dispose(false);
    }
    #endregion // IDisposable Members.
}

donc nous sondons de la même façon, mais en utilisant le Poller classe de NetMQ. Dans la suite de la tâche nous avons mis ainsi nous sommes sûrs que les deux Poller et Task sont annulés. Nous sommes alors sûr d'en disposer...

2
répondu MoonKnight 2015-05-05 11:39:15

Un niveau supérieur de vue sur le sujet

votre attention et vos efforts, consacrés à la création d'un cadre de test, indiquent que votre volonté vise à développer une approche rigoureuse et professionnelle, ce qui m'a fait d'abord lever mon chapeau dans un salut d'admiration à une telle entreprise courageuse.

Bien que le test soit une activité importante pour fournir une preuve quantitative raisonnable qu'un système en cours de Test répond à des attentes définies, le succès de cette activité dépend de la façon dont: fermer l'environnement d'essai répond aux conditions du déploiement réel.

on peut convenir, que tester sur une autre base, différente, ne prouve pas que le déploiement réel se déroulera comme prévu dans un environnement, qui est principalement différent de celui(s) testé (s).


contrôle par élément ou simplement contrôle par état, voilà la question.

vos efforts ( au moins au moment de L'opération a été posté ) se concentrent sur l'architecture de code, qui tente de gardez les instances en place et essayez de modifier l'état interne d'une instance de Poller avant qu'une batterie de test ne démarre.

à mon avis, les tests ont quelques principes à suivre, si vous vous efforcez de passer des tests professionnels:

  • principe de la répétabilité de L'essai ( tests de "re-essais doit servir les mêmes résultats, évitant ainsi un quasi-test qui fournit juste une"loterie" )

  • Principe de Essais Sans Intervention (les reprises des essais ne doivent pas faire l'objet d'interférences "externes", non contrôlées par le scénario d'essai )

plutôt reculer d'un pas pour avoir le contrôle sur le cycle de vie complet des éléments

Caci Simulations, Inc., (une des entreprises de Harry Markowitz ) développé au début des années 90 leur logiciel phare Comet III-un moteur de simulation exceptionnellement puissant pour la conception-prototypage et la performance-des simulations de processus opérés dans le calcul à grande échelle / réseaux / Réseaux de télécommunications.

la plus grande impression de COMET III a été sa capacité à générer des scénarios de test incluant une pré-charge "warm-up" configurable avant le test, qui ont fait que les éléments testés se trouvent dans un état similaire à ce qui par "fatigue", on entend dans les essais de torture mécanique ou ce que la fragilité par diffusion de l'hydrogène signifie pour les métallurgistes des centrales nucléaires.

Oui, Une fois que vous entrez dans les détails de bas niveau sur la façon dont les algorithmes, les tampons de nœuds, les allocations de mémoire, les sélections d'architectures de traitement par canalisation / équilibrées par la charge / quadrillées, les frais généraux de résilience aux pannes, les politiques de collecte des ordures et les algorithmes limités de partage des ressources fonctionnent et ont un impact ( sous des modèles de charge de travail d'utilisation réelle "pression") de bout en bout performance / latences, cette fonctionnalité est tout simplement indispensable.

Comment cela s'applique à l'OP problème?

  • au lieu de simplement contrôler l'état
  • Créer un multi-couche-de l'architecture de contrôle / -plan(s) / séparé de signalisation

Un ZeroMQ moyen de soutenir cet objectif

  • Créer un super-structures non-trivial modèles
  • utiliser les contrôles du cycle de vie complet des instances utilisées dans les essais les scénarios
  • Conserver ZeroMQ maximes: Zéro-partage, Zéro-blocage, l' ...
  • Bénéficier de Multi-Contexte()
1
répondu user3666197 2017-06-24 22:23:03