Le pool de fils utilisant boost asio

j'essaie de créer une classe de pool de threads limitée en utilisant boost::asio. Mais je suis coincé à un moment peut quelqu'un pour m'aider.

le seul problème est l'endroit où je devrais diminuer le compteur?

Le code

ne fonctionne pas comme prévu.

le problème est que je ne sais pas quand mon thread va finir l'exécution et comment je vais apprendre à savoir qu'il a Retour à la piscine

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool
{
    static int count;
    int NoOfThread;
    thread_group grp;
    mutex mutex_;
    asio::io_service io_service;
    int counter;
    stack<thread*> thStk ;

public:
    ThreadPool(int num)
    {   
        NoOfThread = num;
        counter = 0;
        mutex::scoped_lock lock(mutex_);

        if(count == 0)
            count++;
        else
            return;

        for(int i=0 ; i<num ; ++i)
        {
            thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
        }
    }
    ~ThreadPool()
    {
        io_service.stop();
        grp.join_all();
    }

    thread* getThread()
    {
        if(counter > NoOfThread)
        {
            cout<<"run out of threads n";
            return NULL;
        }

        counter++;
        thread* ptr = thStk.top();
        thStk.pop();
        return ptr;
    }
};
int ThreadPool::count = 0;


struct callable
{
    void operator()()
    {
        cout<<"some task for thread n";
    }
};

int main( int argc, char * argv[] )
{

    callable x;
    ThreadPool pool(10);
    thread* p = pool.getThread();
    cout<<p->get_id();

    //how i can assign some function to thread pointer ?
    //how i can return thread pointer after work done so i can add 
//it back to stack?


    return 0;
}
18
demandé sur Sam Miller 2012-08-31 16:27:02

1 réponses

en bref, vous devez envelopper la tâche fournie par l'utilisateur avec une autre fonction qui sera:

  • invoquez la fonction utilisateur ou l'objet appelant.
  • verrouillez le mutex et décrémentez le comptoir.

Je ne comprends peut-être pas toutes les exigences pour ce groupe de fils. Ainsi, pour plus de clarté, voici une liste explicite de ce que je crois sont les exigences:

  • La piscine gère la durée de vie des filets. L'utilisateur ne devrait pas être en mesure de supprimer les threads qui résident dans le pool.
  • l'utilisateur peut assigner une tâche au pool de manière non intrusive.
  • Lorsqu'une tâche est assignée, si tous les threads dans le pool exécutent actuellement d'autres tâches, alors la tâche est écartée.

avant de fournir une mise en œuvre, Il ya quelques points clés I voudrait souligner:

  • une fois qu'un thread a été lancé, il s'exécute jusqu'à son achèvement, son annulation ou sa résiliation. La fonction que le thread exécute ne peut pas être réassignée. Pour permettre à un seul thread d'exécuter plusieurs fonctions au cours de sa vie, le thread voudra se lancer avec une fonction qui lira à partir d'une file d'attente, comme io_service::run() , et les types appelant sont affichés dans la file d'attente d'événements, comme io_service::post() .
  • io_service::run() retourne s'il n'y a pas de travail en attente dans le io_service , le io_service est arrêté, ou une exception est lancée d'un handler que le fil était en cours d'exécution. Pour empêcher io_serivce::run() de revenir quand il n'y a pas de travail inachevé, la classe io_service::work peut être utilisée.
  • définissant les exigences de type de la tâche (c.-à-d. que le type de la tâche doit pouvoir être appelé par la syntaxe object() ) au lieu d'exiger un type (c.-à-d. que la tâche doit hériter de process ), offre plus de souplesse à l'utilisateur. Il permet à l'utilisateur de fournir une tâche comme un pointeur de fonction ou un type fournissant une nullaire operator() .

mise en Œuvre à l'aide de boost::asio :

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  boost::asio::io_service io_service_;
  boost::asio::io_service::work work_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : work_( io_service_ ),
      available_( pool_size )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &boost::asio::io_service::run,
                                           &io_service_ ) );
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Force all threads to return from io_service::run().
    io_service_.stop();

    // Suppress all exceptions.
    try
    {
      threads_.join_all();
    }
    catch ( const std::exception& ) {}
  }

  /// @brief Adds a task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Post a wrapped task into the queue.
    io_service_.post( boost::bind( &thread_pool::wrap_task, this,
                                   boost::function< void() >( task ) ) );
  }

private:
  /// @brief Wrap a task so that the available count can be increased once
  ///        the user provided task has completed.
  void wrap_task( boost::function< void() > task )
  {
    // Run the user supplied task.
    try
    {
      task();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}

    // Task has finished, so increment count of available threads.
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
  }
};

quelques commentaires sur la mise en œuvre:

  • le traitement des exceptions doit se faire autour de la tâche de l'utilisateur. Si la fonction de l'utilisateur ou objet appelant jette une exception que n'est pas de type boost::thread_interrupted , puis std::terminate() est appelé. C'est le résultat de Boost.Thread exceptions dans le thread fonctions de comportement. Il est également intéressant de lire Boost.Asio effet d'exceptions générées à partir de gestionnaires .
  • si l'utilisateur fournit le task via boost::bind , alors le boost::bind imbriqué manquera de compiler. L'une des options suivantes est requise:
    • Ne supporte pas task créé par boost::bind .
    • méta-programmation pour effectuer la ramification de compilation basée sur le type de l'utilisateur si le résultat de boost::bind de sorte que boost::protect pourrait être utilisé, comme boost::protect ne fonctionne correctement sur certains objets de fonction.
    • utilisez un autre type pour passer indirectement l'objet task . J'ai choisi d'utiliser boost::function pour la lisibilité au prix de perdre le type exact. boost::tuple , bien que légèrement moins lisible, pourrait également être utilisé pour préserver le type exact, comme vu dans le Boost.Exemple d'Asio pour la sérialisation .

le code D'Application peut maintenant utiliser le type thread_pool sans intrusion:

void work() {};

struct worker
{
  void operator()() {};
};

void more_work( int ) {};

int main()
{ 
  thread_pool pool( 2 );
  pool.run_task( work );                        // Function pointer.
  pool.run_task( worker() );                    // Callable object.
  pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

Le thread_pool pourrait être créé sans coup de pouce.Asio, et peut être un peu plus facile pour les responsables, car ils n'ont plus besoin de savoir sur Boost.Asio comportements, tels que quand fait io_service::run() retour, et ce qui est io_service::work objet:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  std::queue< boost::function< void() > > tasks_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool running_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : available_( pool_size ),
      running_( true )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Set running flag to false then notify all threads.
    {
      boost::unique_lock< boost::mutex > lock( mutex_ );
      running_ = false;
      condition_.notify_all();
    }

    try
    {
      threads_.join_all();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}
  }

  /// @brief Add task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Set task and signal condition variable so that a worker thread will
    // wake up andl use the task.
    tasks_.push( boost::function< void() >( task ) );
    condition_.notify_one();
  }

private:
  /// @brief Entry point for pool threads.
  void pool_main()
  {
    while( running_ )
    {
      // Wait on condition variable while the task is empty and the pool is
      // still running.
      boost::unique_lock< boost::mutex > lock( mutex_ );
      while ( tasks_.empty() && running_ )
      {
        condition_.wait( lock );
      }
      // If pool is no longer running, break out.
      if ( !running_ ) break;

      // Copy task locally and remove from the queue.  This is done within
      // its own scope so that the task object is destructed immediately
      // after running the task.  This is useful in the event that the
      // function contains shared_ptr arguments bound via bind.
      {
        boost::function< void() > task = tasks_.front();
        tasks_.pop();

        lock.unlock();

        // Run the task.
        try
        {
          task();
        }
        // Suppress all exceptions.
        catch ( const std::exception& ) {}
      }

      // Task has finished, so increment count of available threads.
      lock.lock();
      ++available_;
    } // while running_
  }
};
33
répondu Tanner Sansbury 2016-03-06 00:00:37