boost :: asio e object attivo

Ho implementato alcuni pattern di progettazione di oggetti attivi basati su moduli. È un’implementazione molto semplice. Ho Scheduler, ActivationList, Requests e Futures per ottenere risposta. I miei requisiti erano così:

  • L’accesso all’object attivo deve essere serializzato eseguendo i suoi metodi all’interno della propria thread (richiesta principale e presupposto del modello di progettazione dell’object attivo)
  • Il chiamante deve essere in grado di specificare la priorità dell’esecuzione delle richieste. Significa che se ci sono più di zero richieste in attesa di esecuzione, devono essere ordinate in base alla priorità assegnata a ciascuna richiesta. Le richieste con priorità più alta devono essere eseguite per prime, quindi se ci saranno sempre richieste in sospeso su ActivationList e avranno priorità più alta di una determinata richiesta, questa richiesta non verrà mai eseguita – è OK per me
  • Deve essere ansible specificare il numero massimo di richieste in sospeso nell’elenco (limitare l’utilizzo della memoria)
  • Deve essere ansible invalidare tutte le richieste in sospeso
  • Le richieste devono essere in grado di restituire valori (bloccando il chiamante) OPPURE devono essere eseguiti senza ritorno di valore ma il chiamante deve essere bloccato fino a quando la richiesta non viene elaborata OPPURE il chiamante non deve essere bloccato e non è importante per esso se la richiesta data è stata elaborata o no g
  • Poco prima dell’esecuzione della richiesta, deve essere eseguito un metodo di guardia per verificare se la richiesta data deve essere eseguita o meno. In caso contrario, restituirà un valore non definito al chiamante (nella mia implementazione corrente è boost :: none, perché ogni tipo di ritorno di richiesta è boost :: opzionale)

OK ora domanda: è ansible utilizzare boost :: asio e soddisfare tutti i miei requisiti? La mia implementazione sta funzionando, ma vorrei usare qualcosa che probabilmente è implementato in modo molto migliore di quanto ho fatto io. Inoltre mi piacerebbe conoscerlo per il futuro e non “reinventare la ruota” ancora una volta.

Boost.Asio può essere utilizzato per comprendere l’intenzione di Active Object : disaccoppia l’esecuzione del metodo dall’invocazione del metodo. Requisiti aggiuntivi dovranno essere gestiti ad un livello superiore, ma non è eccessivamente complesso quando si utilizza Boost.Asio in combinazione con altre librerie Boost.

Scheduler potrebbe utilizzare:

  • boost::thread per l’astrazione del filo.
  • boost::thread_group per gestire la durata dei thread.
  • boost::asio::io_service per fornire un threadpool. Probabilmente vorrà usare boost::asio::io_service::work per mantenere i thread boost::asio::io_service::work quando nessun lavoro è in sospeso.

ActivationList potrebbe essere implementato come:

  • Un Boost.MultiIndex per ottenere la richiesta del metodo con priorità più alta. Con un insert() posizione insert() , l’ordine di inserimento viene conservato per la richiesta con la stessa priorità.
  • std::multiset o std::multimap può essere usato. Tuttavia, in C ++ 03 non è specificato l’ordine di richiesta con la stessa chiave (priorità).
  • Se Request non ha bisogno di un metodo di guardia, allora potrebbe essere usato std::priority_queue .

Request potrebbe essere un tipo non specificato:

  • boost::function e boost::bind potrebbero essere usati per fornire una cancellazione del tipo, mentre si legano a tipi chiamabili senza introdurre una gerarchia di Request .

Futures potrebbero utilizzare il supporto Futures di Boost.Thread.

  • future.valid() restituirà true se la Request è stata aggiunta a ActivationList .
  • future.wait() bloccherà in attesa che un risultato diventi disponibile.
  • future.get() bloccherà in attesa del risultato.
  • Se il chiamante non fa nulla con il future , il chiamante non verrà bloccato.
  • Un altro vantaggio nell’usare i Futures di Boost.Thread è che le eccezioni provenienti da una Request verranno passate al Future .

Ecco un esempio completo che fa leva su varie librerie Boost e dovrebbe soddisfare i requisiti:

 // Standard includes #include  // std::find_if #include  #include  // 3rd party includes #include  #include  #include  #include  #include  #include  #include  #include  #include  #include  /// @brief scheduler that provides limits with prioritized jobs. template  > class scheduler { public: typedef Priority priority_type; private: /// @brief method_request is used to couple the guard and call /// functions for a given method. struct method_request { typedef boost::function ready_func_type; typedef boost::function run_func_type; template  method_request(ReadyFunctor ready, RunFunctor run) : ready(ready), run(run) {} ready_func_type ready; run_func_type run; }; /// @brief Pair type used to associate a request with its priority. typedef std::pair > pair_type; static bool is_method_ready(const pair_type& pair) { return pair.second->ready(); } public: /// @brief Construct scheduler. /// /// @param max_threads Maximum amount of concurrent task. /// @param max_request Maximum amount of request. scheduler(std::size_t max_threads, std::size_t max_request) : work_(io_service_), max_request_(max_request), request_count_(0) { // Spawn threads, dedicating them to the io_service. for (std::size_t i = 0; i < max_threads; ++i) threads_.create_thread( boost::bind(&boost::asio::io_service::run, &io_service_)); } /// @brief Destructor. ~scheduler() { // Release threads from the io_service. io_service_.stop(); // Cleanup. threads_.join_all(); } /// @brief Insert a method request into the scheduler. /// /// @param priority Priority of job. /// @param ready_func Invoked to check if method is ready to run. /// @param run_func Invoked when ready to run. /// /// @return future associated with the method. template  boost::unique_future::type> insert(priority_type priority, const ReadyFunctor& ready_func, const RunFunctor& run_func) { typedef typename boost::result_of::type result_type; typedef boost::unique_future future_type; boost::unique_lock lock(mutex_); // If max request has been reached, then return an invalid future. if (max_request_ && (request_count_ == max_request_)) return future_type(); ++request_count_; // Use a packaged task to handle populating promise and future. typedef boost::packaged_task task_type; // Bind does not work with rvalue, and packaged_task is only moveable, // so allocate a shared pointer. boost::shared_ptr task = boost::make_shared(run_func); // Create method request. boost::shared_ptr request = boost::make_shared( ready_func, boost::bind(&task_type::operator(), task)); // Insert into priority. Hint to inserting as close to the end as // possible to preserve insertion order for request with same priority. activation_list_.insert(activation_list_.end(), pair_type(priority, request)); // There is now an outstanding request, so post to dispatch. io_service_.post(boost::bind(&scheduler::dispatch, this)); return task->get_future(); } /// @brief Insert a method request into the scheduler. /// /// @param ready_func Invoked to check if method is ready to run. /// @param run_func Invoked when ready to run. /// /// @return future associated with the method. template  boost::unique_future::type> insert(const ReadyFunctor& ready_func, const RunFunctor& run_func) { return insert(priority_type(), ready_func, run_func); } /// @brief Insert a method request into the scheduler. /// /// @param priority Priority of job. /// @param run_func Invoked when ready to run. /// /// @return future associated with the method. template  boost::unique_future::type> insert(priority_type priority, const RunFunctor& run_func) { return insert(priority, &always_ready, run_func); } /// @brief Insert a method request with default priority into the /// scheduler. /// /// @param run_func Invoked when ready to run. /// /// @param functor Job to run. /// /// @return future associated with the job. template  boost::unique_future::type> insert(const RunFunc& run_func) { return insert(&always_ready, run_func); } /// @brief Cancel all outstanding request. void cancel() { boost::unique_lock lock(mutex_); activation_list_.clear(); request_count_ = 0; } private: /// @brief Dispatch a request. void dispatch() { // Get the current highest priority request ready to run from the queue. boost::unique_lock lock(mutex_); if (activation_list_.empty()) return; // Find the highest priority method ready to run. typedef typename activation_list_type::iterator iterator; iterator end = activation_list_.end(); iterator result = std::find_if( activation_list_.begin(), end, &is_method_ready); // If no methods are ready, then post into dispatch, as the // method may have become ready. if (end == result) { io_service_.post(boost::bind(&scheduler::dispatch, this)); return; } // Take ownership of request. boost::shared_ptr method = result->second; activation_list_.erase(result); // Run method without mutex. lock.unlock(); method->run(); lock.lock(); // Perform bookkeeping. --request_count_; } static bool always_ready() { return true; } private: /// @brief List of outstanding request. typedef boost::multi_index_container< pair_type, boost::multi_index::indexed_by< boost::multi_index::ordered_non_unique< boost::multi_index::member, Compare > > > activation_list_type; activation_list_type activation_list_; /// @brief Thread group managing threads servicing pool. boost::thread_group threads_; /// @brief io_service used to function as a thread pool. boost::asio::io_service io_service_; /// @brief Work is used to keep threads servicing io_service. boost::asio::io_service::work work_; /// @brief Maximum amount of request. const std::size_t max_request_; /// @brief Count of outstanding request. std::size_t request_count_; /// @brief Synchronize access to the activation list. typedef boost::mutex mutex_type; mutex_type mutex_; }; typedef scheduler > high_priority_scheduler; /// @brief adder is a simple proxy that will delegate work to /// the scheduler. class adder { public: adder(high_priority_scheduler& scheduler) : scheduler_(scheduler) {} /// @brief Add a and b with a priority. /// /// @return Return future result. template  boost::unique_future add( high_priority_scheduler::priority_type priority, const T& a, const T& b) { // Insert method request return scheduler_.insert( priority, boost::bind(&adder::do_add, a, b)); } /// @brief Add a and b. /// /// @return Return future result. template  boost::unique_future add(const T& a, const T& b) { return add(high_priority_scheduler::priority_type(), a, b); } private: /// @brief Actual add a and b. template  static T do_add(const T& a, const T& b) { std::cout << "Starting addition of '" << a << "' and '" << b << "'" << std::endl; // Mimic busy work. boost::this_thread::sleep_for(boost::chrono::seconds(2)); std::cout << "Finished addition" << std::endl; return a + b; } private: high_priority_scheduler& scheduler_; }; bool get(bool& value) { return value; } void guarded_call() { std::cout << "guarded_call" << std::endl; } int main() { const unsigned int max_threads = 1; const unsigned int max_request = 4; // Sscheduler high_priority_scheduler scheduler(max_threads, max_request); // Proxy adder adder(scheduler); // Client // Add guarded method to scheduler. bool ready = false; std::cout << "Add guarded method." << std::endl; boost::unique_future future1 = scheduler.insert( boost::bind(&get, boost::ref(ready)), &guarded_call); // Add 1 + 100 with default priority. boost::unique_future future2 = adder.add(1, 100); // Force sleep to try to get scheduler to run request 2 first. boost::this_thread::sleep_for(boost::chrono::seconds(1)); // Add: // 2 + 200 with low priority (5) // "test" + "this" with high priority (99) boost::unique_future future3 = adder.add(5, 2, 200); boost::unique_future future4 = adder.add(99, std::string("test"), std::string("this")); // Max request should have been reached, so add another. boost::unique_future future5 = adder.add(3, 300); // Check if request was added. std::cout << "future1 is valid: " << future1.valid() << "\nfuture2 is valid: " << future2.valid() << "\nfuture3 is valid: " << future3.valid() << "\nfuture4 is valid: " << future4.valid() << "\nfuture5 is valid: " << future5.valid() << std::endl; // Get results for future2 and future3. Do nothing with future4's results. std::cout << "future2 result: " << future2.get() << "\nfuture3 result: " << future3.get() << std::endl; std::cout << "Unguarding method." << std::endl; ready = true; future1.wait(); } 

L'esecuzione utilizza il pool di thread di 1 con un massimo di 4 richieste.

  • request1 è protetto fino alla fine del programma e dovrebbe essere l'ultimo a essere eseguito.
  • request2 (1 + 100) è inserito con priorità predefinita e dovrebbe essere il primo a essere eseguito.
  • request3 (2 + 200) è inserito a bassa priorità e dovrebbe essere eseguito dopo request4.
  • request4 ('test' + 'this') è inserito con priorità alta e dovrebbe essere eseguito prima di request3.
  • request5 non dovrebbe riuscire a inserire a causa della max richiesta e non dovrebbe essere valido.

L'output è il seguente:

  Aggiungi un metodo protetto.
 Inizia l'aggiunta di '1' e '100'
 future1 è valido: 1
 future2 è valido: 1
 future3 è valido: 1
 future4 è valido: 1
 future5 è valido: 0
 Aggiunta finito
 Inizia l'aggiunta di "test" e "this"
 Aggiunta finito
 Aggiunta iniziale di "2" e "200"
 Aggiunta finito
 future2 risultato: 101
 risultato futuro3: 202
 Metodo Unguarding
 guarded_call