Arrêt de C++ 11 STD::threads en attente d'une variable STD:: condition

J'essaie de comprendre les mécanismes multithreading de base dans la nouvelle norme C++ 11. L'exemple le plus fondamental auquel je peux penser est le suivant:

  • un producteur et un consommateur sont implémentés dans des threads séparés
  • le producteur place une certaine quantité d'éléments dans une file d'attente
  • le consommateur prend des articles de la file d'attente s'il y en a

Cet exemple est également utilisé dans de nombreux livres scolaires sur le multithreading et tout sur le processus de communication fonctionne très bien. Cependant, j'ai un problème quand il s'agit d'arrêter le fil de consommation.

Je veux que le consommateur s'exécute jusqu'à ce qu'il obtienne un signal d'arrêt explicite (dans la plupart des cas, cela signifie que j'attends la fin du producteur pour pouvoir arrêter le consommateur avant la fin du programme). Malheureusement, les threads C++ 11 manquent d'un mécanisme d'interruption (que je connais du multithreading en Java par exemple). Donc, je dois utiliser des drapeaux comme isRunning pour signaler que je veux un thread arrêter.

Le problème principal est maintenant: après avoir arrêté le thread producteur, la file d'attente est vide et le consommateur attend un condition_variable pour obtenir un signal lorsque la file d'attente est remplie à nouveau. J'ai donc besoin de réveiller le thread en appelant notify_all() sur la variable avant de quitter.

J'ai trouvé une solution de travail, mais cela semble en quelque sorte désordonné. L'exemple de code est répertorié ci-dessous (je suis désolé mais je ne pouvais pas réduire la taille du code pour un exemple minimal" minimal"):

Le Classe de file d'attente:

class Queue{
public:
    Queue() : m_isProgramStopped{ false } { }

    void push(int i){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_q.push(i);
        m_cond.notify_one();
    }

    int pop(){
        std::unique_lock<std::mutex> lock(m_mtx);
        m_cond.wait(lock, [&](){ return !m_q.empty() || m_isProgramStopped; });

        if (m_isProgramStopped){
            throw std::exception("Program stopped!");
        }

        int x = m_q.front();
        m_q.pop();

        std::cout << "Thread " << std::this_thread::get_id() << " popped " << x << "." << std::endl;
        return x;
    }

    void stop(){
        m_isProgramStopped = true;
        m_cond.notify_all();
    }

private:
    std::queue<int> m_q;
    std::mutex m_mtx;
    std::condition_variable m_cond;
    bool m_isProgramStopped;
};

Le Producteur:

class Producer{
public:
    Producer(Queue & q) : m_q{ q }, m_counter{ 1 } { }

    void produce(){
        for (int i = 0; i < 5; i++){
            m_q.push(m_counter++);
            std::this_thread::sleep_for(std::chrono::milliseconds{ 500 });
        }
    }

    void execute(){
        m_t = std::thread(&Producer::produce, this);
    }

    void join(){
        m_t.join();
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_counter;
};

Le Consommateur:

class Consumer{
public:
    Consumer(Queue & q) : m_q{ q }, m_takeCounter{ 0 }, m_isRunning{ true }
    { }

    ~Consumer(){
        std::cout << "KILL CONSUMER! - TOOK: " << m_takeCounter << "." << std::endl;
    }

    void consume(){
        while (m_isRunning){
            try{
                m_q.pop();
                m_takeCounter++;
            }
            catch (std::exception e){
                std::cout << "Program was stopped while waiting." << std::endl;
            }
        }
    }

    void execute(){
        m_t = std::thread(&Consumer::consume, this);
    }

    void join(){
        m_t.join();
    }

    void stop(){
        m_isRunning = false;
    }

private:
    Queue & m_q;
    std::thread m_t;

    unsigned int m_takeCounter;
    bool m_isRunning;
};

Et enfin le main():

int main(void){
    Queue q;

    Consumer cons{ q };
    Producer prod{ q };

    cons.execute();
    prod.execute();

    prod.join();

    cons.stop();
    q.stop();

    cons.join();

    std::cout << "END" << std::endl;

    return EXIT_SUCCESS;
}

Est-ce le moyen juste de terminer un thread qui attend une variable de condition A ou Existe-t-il de meilleures méthodes? Actuellement, la file d'attente doit savoir si le programme s'est arrêté (ce qui à mon avis détruit le couplage lâche des composants) et j'ai besoin d'appeler stop() sur la file d'attente explicitement ce qui ne semble pas correct.

En outre, le la variable condition qui devrait juste être utilisée comme singal si la file d'attente est vide représente maintenant une autre condition-si le programme est terminé. Si Je ne me trompe pas, chaque fois qu'un thread attend sur une variable de condition qu'un événement se produise, il devrait également vérifier si le thread doit être arrêté avant de continuer son exécution (ce qui semble également faux).

Ai-je ces problèmes parce que toute ma conception est défectueuse ou est-ce qu'il me manque des mécanismes qui peuvent être utilisés pour quitter les threads dans un propre chemin?

25
demandé sur Devon Cornwall 2014-02-13 18:33:40

2 réponses

Non, il n'y a rien de mal à votre conception, et c'est l'approche normale adoptée pour ce genre de problème.

Il est parfaitement valide pour vous d'avoir plusieurs conditions (par exemple, tout ce qui est en file d'attente ou en arrêt de programme) attaché à une variable de condition. L'essentiel est que les bits de la condition soient vérifiés lorsque wait retourne.

Au lieu d'avoir un drapeau dans Queue pour indiquer que le programme s'arrête, vous devriez penser au drapeau comme "Puis-je accepter". C'est mieux paradigme global et fonctionne mieux dans un environnement multi-thread.

En outre, au lieu d'avoir pop Lancer une exception si quelqu'un l'appelle et que stop a été appelé, vous pouvez remplacer la méthode par bool try_pop(int &value) qui retournera true si une valeur a été renvoyée, sinon false. De cette façon, l'appelant peut vérifier si la file d'attente a été arrêtée (ajoutez une méthode bool is_stopped() const). Bien que la gestion des exceptions fonctionne ici, elle est un peu lourde et n'est pas vraiment un cas exceptionnel dans un multi-thread programme.

5
répondu Sean 2014-02-13 14:58:43

wait peut être appelé avec un délai d'attente. Le contrôle est renvoyé au thread et stop peut être vérifié. En fonction de cette valeur, il peut wait sur plus d'éléments à consommer ou terminer l'exécution. Une bonne introduction au multithreading avec c++ est C++11 Concurrency .

1
répondu knivil 2014-02-13 15:51:25