Mise en pool de threads en C++11
questions pertinentes :
À Propos De C++11:
- C++11: std::thread pooled?
- Sera asynchrone(lancement::async) en C++11 pour des pools de threads obsolète pour éviter de coûteuses de création de thread?
À Propos De Boost:
Comment puis-je obtenir un pool de threads à envoyer des tâches à , sans les créer et les supprimer encore et encore? Cela signifie des fils persistants pour resynchroniser sans se joindre.
j'ai un code qui ressemble à ceci:
namespace {
std::vector<std::thread> workers;
int total = 4;
int arr[4] = {0};
void each_thread_does(int i) {
arr[i] += 2;
}
}
int main(int argc, char *argv[]) {
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
workers.push_back(std::thread(each_thread_does, j));
}
for (std::thread &t: workers) {
if (t.joinable()) {
t.join();
}
}
arr[4] = std::min_element(arr, arr+4);
}
return 0;
}
au lieu de créer et de joindre des threads à chaque itération, je préférerais envoyer des tâches à mes worker threads à chaque itération et ne les créer qu'une seule fois.
9 réponses
vous pouvez utiliser la bibliothèque C++ Thread Pool, https://github.com/vit-vit/ctpl .
alors le code que vous avez écrit peut être remplacé par le suivant
#include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library
int main (int argc, char *argv[]) {
ctpl::thread_pool p(2 /* two threads in the pool */);
int arr[4] = {0};
std::vector<std::future<void>> results(4);
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
results[j] = p.push([&arr, j](int){ arr[j] +=2; });
}
for (int j = 0; j < 4; ++j) {
results[j].get();
}
arr[4] = std::min_element(arr, arr + 4);
}
}
, Vous obtiendrez le nombre de threads et ne sera pas de créer et de supprimer encore et encore sur les itérations.
un pool de threads signifie que tous vos threads sont en cours d'exécution, tout le temps – en d'autres termes, la fonction thread ne revient jamais. Pour donner aux threads quelque chose de significatif à faire, vous devez concevoir un système de communication inter-thread, à la fois dans le but de dire au thread qu'il y a quelque chose à faire, ainsi que pour communiquer les données du travail réel.
Généralement, cela impliquera une sorte de structure de données simultanées, et chaque thread serait probablement dormir sur une sorte de variable de condition, qui serait notifié quand il ya du travail à faire. À la réception de la notification, un ou plusieurs threads se réveillent, récupèrent une tâche de la structure de données concurrente, la traitent et stockent le résultat d'une manière analogue.
le fil irait ensuite vérifier s'il y a encore plus de travail à faire, et si non retourner dormir.
le résultat est que vous devez concevoir tout cela vous-même, car il y a la notion de "travail" n'est-elle pas d'application universelle? C'est un sacré travail, et il y a des problèmes subtils que vous devez régler. (Vous pouvez programmer dans Go si vous aimez un système qui prend soin de la gestion de thread pour vous dans les coulisses.)
ceci est copié de ma réponse à un autre post très similaire, espérons qu'il peut aider:
1) Commencer avec le nombre maximum de threads qu'un système peut supporter:
int Num_Threads = thread::hardware_concurrency();
2) pour une implémentation threadpool efficace, une fois que les threads sont créés selon Num_Threads, il est préférable de ne pas en créer de nouveaux, ou de ne pas détruire les anciens (en se joignant). Il y aura une pénalité de performance, qui pourrait même ralentir votre application par rapport à la version série.
chaque thread C++11 devrait être exécuté dans sa fonction avec une boucle infinie, attendant constamment de nouvelles tâches à saisir et exécuter.
Voici comment attacher une telle fonction au pool de threads:
int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{ Pool.push_back(thread(Infinite_loop_function));}
3) L'Infinite_loop_function
il s'agit d'une boucle" while(true) " en attente de la file d'attente des tâches
void The_Pool:: Infinite_loop_function()
{
while(true)
{
{
unique_lock<mutex> lock(Queue_Mutex);
condition.wait(lock, []{return !Queue.empty()});
Job = Queue.front();
Queue.pop();
}
Job(); // function<void()> type
}
};
4) Créer une fonction pour ajouter du travail à votre File d'attente
void The_Pool:: Add_Job(function<void()> New_Job)
{
{
unique_lock<mutex> lock(Queue_Mutex);
Queue.push(New_Job);
}
condition.notify_one();
}
5) liez une fonction arbitraire à votre File d'attente
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
une fois que vous avez intégré ces ingrédients, vous disposez de votre propre pool de filetage dynamique. Ces fils fonctionnent toujours, attendant que le travail fasse.
Je m'excuse s'il y a des erreurs de syntaxe, j'ai tapé ce code et j'ai une mauvaise mémoire. Désolé, Je ne peux pas vous fournir le code complet du pool de threads, qui violerait mon intégrité de travail.
un threadpool est au cœur un ensemble de threads tous liés à une fonction fonctionnant comme une boucle d'événement. Ces threads attendront sans cesse qu'une tâche soit exécutée, ou leur propre fin.
le travail threadpool est de fournir une interface pour soumettre des travaux, définir (et peut-être modifier) la Politique d'exécution de ces travaux (règles de programmation, instanciation de thread, taille de la piscine), et surveiller l'état des threads et des ressources connexes.
donc pour un polyvalent, il faut commencer par définir ce qu'est une tâche, comment elle est lancée, interrompue, Quel est le résultat (voir la notion de promesse et l'avenir pour cette question), Quel genre d'événements les fils auront à répondre, comment ils les traiteront, comment ces événements seront différenciés de ceux traités par les tâches. Cela peut devenir assez compliqué comme vous pouvez le voir, et imposer des restrictions sur la façon dont les fils fonctionneront, que la solution devient de plus en plus impliqué.
l'outillage actuel pour gérer les événements est assez basique (*): primitives comme les Mutex, les variables de condition, et quelques abstractions en plus (serrures, barrières). Mais dans certains cas, ces abstractions peuvent s'avérer impropres (voir cette question ), et il faut revenir à l'utilisation des primitifs.
D'autres problèmes doivent être gérés aussi:
- signal
- i/ o
- matériel (l'affinité du processeur, hétérogène setup)
comment cela se déroulerait-il dans votre environnement?
Cette réponse à une question similaire points à une mise en œuvre signifiait pour boost et la stl.
j'ai offert un très rudimentaire mise en œuvre d'un threadpool pour une autre question, qui ne répond pas de nombreux problèmes décrits ci-dessus. Vous pourriez construire sur elle. Vous pouvez également regarder des cadres existants dans d'autres langues, à trouver l'inspiration.
(*) je ne vois pas cela comme un problème, bien au contraire. Je pense que c'est l'esprit même du c++ hérité de C.
quelque chose comme ceci pourrait aider (tiré d'une application de travail).
#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
struct thread_pool {
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
for (int i = 0; i < threads; ++i) {
auto worker = [this] { return service.run(); };
grp.add_thread(new boost::thread(worker));
}
}
template<class F>
void enqueue(F f) {
service.post(f);
}
~thread_pool() {
service_worker.reset();
grp.join_all();
service.stop();
}
private:
boost::asio::io_service service;
asio_worker service_worker;
boost::thread_group grp;
};
Vous pouvez l'utiliser comme ceci:
thread_pool pool(2);
pool.enqueue([] {
std::cout << "Hello from Task 1\n";
});
pool.enqueue([] {
std::cout << "Hello from Task 2\n";
});
gardez à l'esprit que réinventer un efficace mécanisme de file d'attente asynchrone n'est pas anodin.
Boost:: asio:: io_service est une implémentation très efficace, ou est en fait une collection d'enveloppements spécifiques à la plate-forme (par exemple, il enveloppe les ports d'achèvement I/O sur Windows).
il s'agit d'une autre implémentation de thread pool qui est très simple, facile à comprendre et à utiliser, utilise uniquement la bibliothèque standard C++11, et peut être regardé ou modifié pour vos utilisations, devrait être un bon démarreur si vous voulez entrer dans l'utilisation de thread pools:
Edit: Cela exige maintenant C++17 et concepts. (En date du 9/12/16, seul g++ 6.0+ est suffisant.)
la déduction du modèle est beaucoup plus précise à cause de cela, cependant, il est intéressant d'obtenir un nouveau compilateur. Je n'ai pas encore trouvé de fonction qui nécessite des arguments de modèle explicites.
il prend aussi maintenant n'importe quel objet appelable approprié ( et est encore statically typesafe!!! ).
il inclut aussi maintenant un pool de threads prioritaires Vert optionnel utilisant la même API. Cette classe est seulement POSIX, cependant. Il utilise l'API ucontext_t
pour la commutation de tâches dans l'espace utilisateur.
j'ai créé une bibliothèque simple pour cela. Un exemple d'utilisation est donné ci-dessous. (Je réponds à cela parce que c'est une des choses que j'ai trouvé avant de décider qu'il était nécessaire de l'écrire moi-même.)
bool is_prime(int n){
// Determine if n is prime.
}
int main(){
thread_pool pool(8); // 8 threads
list<future<bool>> results;
for(int n = 2;n < 10000;n++){
// Submit a job to the pool.
results.emplace_back(pool.async(is_prime, n));
}
int n = 2;
for(auto i = results.begin();i != results.end();i++, n++){
// i is an iterator pointing to a future representing the result of is_prime(n)
cout << n << " ";
bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
if(prime)
cout << "is prime";
else
cout << "is not prime";
cout << endl;
}
}
Vous pouvez passer async
n'importe quelle fonction avec n'importe quelle valeur de retour (ou nulle) et n'importe quels arguments (ou non) et il retournera un std::future
correspondant . Pour obtenir le résultat (ou juste attendre qu'une tâche soit terminée) vous appelez get()
sur le futur.
voici le github: https://github.com/Tyler-Hardin/thread_pool .
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:
function_pool.h
#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;
public:
Function_pool();
~Function_pool();
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};
function_pool.cpp
#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
void Function_pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}
void Function_pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}
principal.cpp
#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
class quit_worker_exception : public std::exception {};
void example_function()
{
std::cout << "bla" << std::endl;
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
}
//here we should send our functions
for (int i = 0; i < 50; i++)
{
func_pool.push(example_function);
}
func_pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
}
un threadpool sans dépendances en dehors de STL est tout à fait possible. J'ai récemment écrit un petit en-tête-seulement bibliothèque threadpool pour traiter le même problème. Il soutient le redimensionnement de piscine dynamique (changeant le nombre de travailleurs à l'exécution), l'attente, l'arrêt, l'arrêt, la reprise et ainsi de suite. J'espère que vous le trouverez utile.