Estendi la durata dei thread con la sincronizzazione (C ++ 11)

Ho un programma con una funzione che accetta un puntatore come arg e un main. Il principale è la creazione di n thread, ognuno dei quali esegue la funzione su diverse aree di memoria a seconda arg passato. I thread vengono quindi uniti, il principale esegue alcuni dati di mixaggio tra l’area e crea n nuovi thread che eseguono la stessa operazione di quelli precedenti.

Per migliorare il programma, vorrei mantenere in vita i thread, rimuovendo il lungo tempo necessario per crearli. I thread dovrebbero dormire quando il main sta funzionando e notificato quando devono tornare di nuovo. Allo stesso modo, il main dovrebbe aspettare quando i thread funzionano come ha fatto con join.

Non posso finire con una forte attuazione di questo, sempre in una fase di stallo.

Un semplice codice di base, qualsiasi suggerimento su come modificare questo sarebbe molto apprezzato

 #include  #include  ... void myfunc(void * p) { do_something(p); } int main(){ void * myp[n_threads] {a_location, another_location,...}; std::thread mythread[n_threads]; for (unsigned long int j=0; j < ULONG_MAX; j++) { for (unsigned int i=0; i < n_threads; i++) { mythread[i] = std::thread(myfunc, myp[i]); } for (unsigned int i=0; i < n_threads; i++) { mythread[i].join(); } mix_data(myp); } return 0; } 

Ecco un ansible approccio usando solo le classi dalla libreria standard C ++ 11. Fondamentalmente, ogni thread che hai creato ha una coda di comando associata (incapsulata negli oggetti std::packaged_task<> ) che controlla continuamente. Se la coda è vuota, il thread attenderà solo una variabile di condizione ( std::condition_variable ).

Mentre le corse dei dati sono evitate attraverso l’uso di wrapper RAII std::mutex e std::unique_lock<> , il thread principale può attendere che un particolare lavoro venga terminato memorizzando l’object std::future<> associato a ogni std::packaged_tast<> inviato std::packaged_tast<> e chiamata wait() su di esso.

Di seguito è riportato un semplice programma che segue questo design. I commenti dovrebbero essere sufficienti per spiegare cosa fa:

 #include  #include  #include  #include  #include  #include  #include  // Convenience type definition using job = std::packaged_task; // Some data associated to each thread. struct thread_data { int id; // Could use thread::id, but this is filled before the thread is started std::thread t; // The thread object std::queue jobs; // The job queue std::condition_variable cv; // The condition variable to wait for threads std::mutex m; // Mutex used for avoiding data races bool stop = false; // When set, this flag tells the thread that it should exit }; // The thread function executed by each thread void thread_func(thread_data* pData) { std::unique_lock l(pData->m, std::defer_lock); while (true) { l.lock(); // Wait until the queue won't be empty or stop is signaled pData->cv.wait(l, [pData] () { return (pData->stop || !pData->jobs.empty()); }); // Stop was signaled, let's exit the thread if (pData->stop) { return; } // Pop one task from the queue... job j = std::move(pData->jobs.front()); pData->jobs.pop(); l.unlock(); // Execute the task! j(); } } // Function that creates a simple task job create_task(int id, int jobNumber) { job j([id, jobNumber] () { std::stringstream s; s << "Hello " << id << "." << jobNumber << std::endl; std::cout << s.str(); }); return j; } int main() { const int numThreads = 4; const int numJobsPerThread = 10; std::vector> futures; // Create all the threads (will be waiting for jobs) thread_data threads[numThreads]; int tdi = 0; for (auto& td : threads) { td.id = tdi++; td.t = std::thread(thread_func, &td); } //================================================= // Start assigning jobs to each thread... for (auto& td : threads) { for (int i = 0; i < numJobsPerThread; i++) { job j = create_task(td.id, i); futures.push_back(j.get_future()); std::unique_lock l(td.m); td.jobs.push(std::move(j)); } // Notify the thread that there is work do to... td.cv.notify_one(); } // Wait for all the tasks to be completed... for (auto& f : futures) { f.wait(); } futures.clear(); //================================================= // Here the main thread does something... std::cin.get(); // ...done! //================================================= //================================================= // Posts some new tasks... for (auto& td : threads) { for (int i = 0; i < numJobsPerThread; i++) { job j = create_task(td.id, i); futures.push_back(j.get_future()); std::unique_lock l(td.m); td.jobs.push(std::move(j)); } // Notify the thread that there is work do to... td.cv.notify_one(); } // Wait for all the tasks to be completed... for (auto& f : futures) { f.wait(); } futures.clear(); // Send stop signal to all threads and join them... for (auto& td : threads) { std::unique_lock l(td.m); td.stop = true; td.cv.notify_one(); } // Join all the threads for (auto& td : threads) { td.t.join(); } } 

Il concetto che vuoi è il threadpool. Questa domanda SO riguarda le implementazioni esistenti.

L’idea è di avere un contenitore per un numero di istanze di thread. Ogni istanza è associata a una funzione che esegue il polling di una coda di attività e quando un’attività è disponibile, la carica ed esegue. Una volta che l’attività è terminata (se termina, ma questo è un altro problema), il thread si collega semplicemente alla coda dei task.

Quindi è necessaria una coda sincronizzata, una class thread che implementa il loop sulla coda, un’interfaccia per gli oggetti task e forse una class per guidare l’intera operazione (la class pool).

In alternativa, è ansible creare una class thread molto specializzata per l’attività che deve eseguire (con solo l’area di memoria come parametro, ad esempio). Ciò richiede un meccanismo di notifica per i thread per indicare che sono fatti con l’iterazione corrente.

La funzione main thread sarebbe un loop su quel task specifico, e alla fine di una iterazione, il thread ne segna la fine e attende le variabili condition per avviare il ciclo successivo. In sostanza, si inserirà il codice attività all’interno del thread, eliminando del tutto la necessità di una coda.

  using namespace std; // semaphore class based on C++11 features class semaphore { private: mutex mMutex; condition_variable v; int mV; public: semaphore(int v): mV(v){} void signal(int count=1){ unique_lock lock(mMutex); mV+=count; if (mV > 0) mCond.notify_all(); } void wait(int count = 1){ unique_lock lock(mMutex); mV-= count; while (mV < 0) mCond.wait(lock); } }; template  class TaskThread { thread mThread; Task *mTask; semaphore *mSemStarting, *mSemFinished; volatile bool mRunning; public: TaskThread(Task *task, semaphore *start, semaphore *finish): mTask(task), mRunning(true), mSemStart(start), mSemFinished(finish), mThread(&TaskThread::psrun){} ~TaskThread(){ mThread.join(); } void run(){ do { (*mTask)(); mSemFinished->signal(); mSemStart->wait(); } while (mRunning); } void finish() { // end the thread after the current loop mRunning = false; } private: static void psrun(TaskThread *self){ self->run();} }; classcMyTask { public: MyTask(){} void operator()(){ // some code here } }; int main(){ MyTask task1; MyTask task2; semaphore start(2), finished(0); TaskThread t1(&task1, &start, &finished); TaskThread t2(&task2, &start, &finished); for (int i = 0; i < 10; i++){ finished.wait(2); start.signal(2); } t1.finish(); t2.finish(); } 

L'implementazione (grezza) proposta sopra si basa sul tipo Task che deve fornire l' operator() (ad esempio un functor come class). Ho detto che potresti incorporare il codice attività direttamente nel corpo della funzione thread in precedenza, ma dal momento che non lo conosco, l'ho mantenuto il più astratto ansible. C'è una variabile di condizione per l'inizio dei thread e uno per la loro fine, entrambi incapsulati in istanze di semaforo.

Vedendo l'altra risposta che propone l'uso di boost::barrier , posso solo sostenere questa idea: assicurati di sostituire la mia class semaforo con quella class se ansible , il motivo è che è meglio fare affidamento su un codice esterno ben testato e mantenuto piuttosto che di una soluzione auto-implementata per lo stesso set di funzionalità.

Tutto sumto, entrambi gli approcci sono validi, ma il primo rinuncia a un minimo di prestazioni a favore della flessibilità. Se l'attività da eseguire richiede un tempo sufficientemente lungo, il costo di sincronizzazione della gestione e della gestione diventa trascurabile.

Aggiornamento: codice fisso e testato. Sostituita una variabile di condizione semplice da un semaforo.

Può essere facilmente raggiunto utilizzando una barriera (solo un wrapper di convenienza su una variabile condizionale e un contatore). Fondamentalmente blocca fino a quando tutti i thread N hanno raggiunto la “barriera”. Quindi “ricicla” di nuovo. Boost fornisce un’implementazione.

 void myfunc(void * p, boost::barrier& start_barrier, boost::barrier& end_barrier) { while (!stop_condition) // You'll need to tell them to stop somehow { start_barrier.wait (); do_something(p); end_barrier.wait (); } } int main(){ void * myp[n_threads] {a_location, another_location,...}; boost::barrier start_barrier (n_threads + 1); // child threads + main thread boost::barrier end_barrier (n_threads + 1); // child threads + main thread std::thread mythread[n_threads]; for (unsigned int i=0; i < n_threads; i++) { mythread[i] = std::thread(myfunc, myp[i], start_barrier, end_barrier); } start_barrier.wait (); // first unblock the threads for (unsigned long int j=0; j < ULONG_MAX; j++) { end_barrier.wait (); // mix_data must not execute before the threads are done mix_data(myp); start_barrier.wait (); // threads must not start new iteration before mix_data is done } return 0; } 

Quello che segue è un semplice codice di compilazione e di lavoro che esegue alcune cose casuali. Implementa il concetto di barriera di aleguna. La lunghezza dell’attività di ogni thread è diversa, quindi è davvero necessario disporre di un forte meccanismo di sincronizzazione. Proverò a fare un pool con gli stessi compiti ea stabilire un benchmark del risultato, e poi forse con i futures come sottolineato da Andy Prowl.

 #include  #include  #include  #include  #include  #include  #include  const unsigned int n_threads=4; //varying this will not (almost) change the total amount of work const unsigned int task_length=30000/n_threads; const float task_length_variation=task_length/n_threads; unsigned int rep=1000; //repetitions of tasks class t_chronometer{ private: std::chrono::steady_clock::time_point _t; public: t_chronometer(): _t(std::chrono::steady_clock::now()) {;} void reset() {_t = std::chrono::steady_clock::now();} double get_now() {return std::chrono::duration_cast>(std::chrono::steady_clock::now() - _t).count();} double get_now_ms() {return std::chrono::duration_cast>(std::chrono::steady_clock::now() - _t).count();} }; class t_barrier { private: std::mutex m_mutex; std::condition_variable m_cond; unsigned int m_threshold; unsigned int m_count; unsigned int m_generation; public: t_barrier(unsigned int count): m_threshold(count), m_count(count), m_generation(0) { } bool wait() { std::unique_lock lock(m_mutex); unsigned int gen = m_generation; if (--m_count == 0) { m_generation++; m_count = m_threshold; m_cond.notify_all(); return true; } while (gen == m_generation) m_cond.wait(lock); return false; } }; using namespace std; void do_something(complex * c, unsigned int max) { complex a(1.,0.); complex b(1.,0.); for (unsigned int i = 0; i * c, unsigned int max, t_barrier* start_barrier, t_barrier* end_barrier) { while (!done) { start_barrier->wait (); do_something(c,max); end_barrier->wait (); } cout << "task finished" << endl; } int main() { t_chronometer t; std::default_random_engine gen; std::normal_distribution dis(.0,1000.0); complex cpx[n_threads]; for (unsigned int i=0; i < n_threads; i++) { cpx[i] = complex(dis(gen), dis(gen)); } t_barrier start_barrier (n_threads + 1); // child threads + main thread t_barrier end_barrier (n_threads + 1); // child threads + main thread std::thread mythread[n_threads]; unsigned long int sum=0; for (unsigned int i=0; i < n_threads; i++) { unsigned int max = task_length + i * task_length_variation; cout << i+1 << "th task length: " << max << endl; mythread[i] = std::thread(task, &cpx[i], max, &start_barrier, &end_barrier); sum+=max; } cout << "total task length " << sum << endl; complex c(0,0); for (unsigned long int j=1; j < rep+1; j++) { start_barrier.wait (); //give to the threads the missing call to start if (j==rep) done=true; end_barrier.wait (); //wait for the call from each tread if (j%100==0) cout << "cycle: " << j << endl; for (unsigned int i=0; i