c – boost :: asio和活动对象

c – boost :: asio和活动对象,第1张

概述我已经实现了一些基于模块的活动对象设计模式.这是非常简单的实现.我有调度程序,激活列表,请求和期货来获得响应. 我的要求是这样的: >对活动对象的访问应通过执行方法进行序列化 在自己的线程(主要的req和假设的活动对象 设计模式) >呼叫者应能够指定请求执行的优先级.这意味着如果有超过零个请求等待执行,它们将按照分配给每个请求的优先级排序.首先要执行具有较高优先级的请求,如果ActivationL 我已经实现了一些基于模块的活动对象设计模式.这是非常简单的实现.我有调度程序,激活列表,请求和期货来获得响应.
我的要求是这样的:

>对活动对象的访问应通过执行方法进行序列化
在自己的线程(主要的req和假设的活动对象
设计模式)
>呼叫者应能够指定请求执行的优先级.这意味着如果有超过零个请求等待执行,它们将按照分配给每个请求的优先级排序.首先要执行具有较高优先级的请求,如果ActivationList上有一些请求待处理,并且它们的优先级要比给定的请求高,则此请求将永远不会执行 – 它对我来说
>可以指定列表中待处理的请求的最大数量(限制内存使用量)
>可以使所有挂起的请求无效
>请求应该能够返回值(阻塞调用者)或者只是在没有值返回的情况下执行,但是调用者在被处理请求之前被阻塞,或者调用者不被阻塞,如果已经处理了给定请求,则不重要不
G
>在请求执行之前,应执行一些保护方法,以检查是否执行给定请求.如果不是 – 它将向调用者返回一些未定义的值(在我当前的实现中,它是boost :: none,因为每个请求返回类型为boost :: optional)

现在问题:
可以使用boost :: asio并满足我的所有要求吗?我的实现正在工作,但我想使用一些比我这样做更好的方式实现的东西.我也想知道这个未来,不要再重新发明了.

解决方法 Boost.Asio可用于涵盖 Active Object的意图:从方法调用中执行解耦方法.需要在更高级别处理其他要求,但在使用Boost.Asio和其他Boost库时并不会过于复杂.

计划程序可以使用:

> boost::thread线程抽象.
> boost::thread_group来管理线程的生命周期.
> boost::asio::io_service提供一个线程池.当没有工作待决时,可能希望使用boost::asio::io_service::work保持线程活动.

ActivationList可以实现为:

> A Boost.MultiIndex获得最高优先级的方法请求.使用提示位置insert(),将保留插入顺序以获得具有相同优先级的请求.
> std :: multiset或std :: multimap可以使用.但是,对于具有相同密钥(优先级)的请求的顺序,在C 03中是未指定的.
>如果请求不需要保护方法,那么可以使用std :: priority_queue.

请求可能是一个未指定的类型:

> boost::functionboost::bind可用于提供类型擦除,同时绑定到可调用类型,而不引入Request层次结构.

期货可以使用Boost.Thread的Futures支持.

如果请求已添加到ActivationList,则future.valID()将返回true.
> future.wait()将阻止等待结果变为可用.
> future.get()将阻止等待结果.
>如果呼叫者未来不做任何事情,则呼叫者将不被阻止.
>使用Boost.Thread的期货的另一个好处是,在请求内发出的异常将被传递给未来.

以下是使用各种Boost库的完整示例,并且应符合以下要求:

// Standard includes#include <algorithm> // std::find_if#include <iostream>#include <string>// 3rd party includes#include <boost/asio.hpp>#include <boost/bind.hpp>#include <boost/function.hpp>#include <boost/make_shared.hpp>#include <boost/multi_index_container.hpp>#include <boost/multi_index/ordered_index.hpp>#include <boost/multi_index/member.hpp>#include <boost/shared_ptr.hpp>#include <boost/thread.hpp>#include <boost/utility/result_of.hpp>/// @brIEf scheduler that provIDes limits with prioritized jobs.template <typename Priority,typename Compare = std::less<Priority> >class scheduler{public:  typedef Priority priority_type;private:  /// @brIEf method_request is used to couple the guard and call  ///        functions for a given method.  struct method_request  {    typedef boost::function<bool()> ready_func_type;    typedef boost::function<voID()> run_func_type;    template <typename ReadyFunctor,typename RunFunctor>    method_request(ReadyFunctor ready,RunFunctor run)      : ready(ready),run(run)    {}    ready_func_type ready;    run_func_type run;  };  /// @brIEf Pair type used to associate a request with its priority.  typedef std::pair<priority_type,boost::shared_ptr<method_request> > pair_type;  static bool is_method_ready(const pair_type& pair)  {    return pair.second->ready();  }public:  /// @brIEf Construct scheduler.  ///  /// @param max_threads Maximum amount of concurrent task.  /// @param max_request Maximum amount of request.    scheduler(std::size_t max_threads,std::size_t max_request)    : work_(io_service_),max_request_(max_request),request_count_(0)  {    // Spawn threads,dedicating them to the io_service.    for (std::size_t i = 0; i < max_threads; ++i)      threads_.create_thread(        boost::bind(&boost::asio::io_service::run,&io_service_));  }  /// @brIEf Destructor.  ~scheduler()  {    // Release threads from the io_service.    io_service_.stop();    // Cleanup.    threads_.join_all();  }  /// @brIEf Insert a method request into the scheduler.  ///  /// @param priority Priority of job.  /// @param ready_func Invoked to check if method is ready to run.  /// @param run_func Invoked when ready to run.  ///  /// @return future associated with the method.  template <typename ReadyFunctor,typename RunFunctor>  boost::unique_future<typename boost::result_of<RunFunctor()>::type>  insert(priority_type priority,const ReadyFunctor& ready_func,const RunFunctor& run_func)  {    typedef typename boost::result_of<RunFunctor()>::type result_type;    typedef boost::unique_future<result_type> future_type;    boost::unique_lock<mutex_type> lock(mutex_);    // If max request has been reached,then return an invalID future.    if (max_request_ &&        (request_count_ == max_request_))      return future_type();    ++request_count_;    // Use a packaged task to handle populating promise and future.    typedef boost::packaged_task<result_type> task_type;    // Bind does not work with rvalue,and packaged_task is only moveable,// so allocate a shared pointer.    boost::shared_ptr<task_type> task =       boost::make_shared<task_type>(run_func);    // Create method request.    boost::shared_ptr<method_request> request =      boost::make_shared<method_request>(        ready_func,boost::bind(&task_type::operator(),task));    // Insert into priority.  Hint to inserting as close to the end as    // possible to preserve insertion order for request with same priority.    activation_List_.insert(activation_List_.end(),pair_type(priority,request));    // There is Now an outstanding request,so post to dispatch.    io_service_.post(boost::bind(&scheduler::dispatch,this));    return task->get_future();  }  /// @brIEf Insert a method request into the scheduler.  ///  /// @param ready_func Invoked to check if method is ready to run.  /// @param run_func Invoked when ready to run.  ///  /// @return future associated with the method.  template <typename ReadyFunctor,typename RunFunctor>  boost::unique_future<typename boost::result_of<RunFunctor()>::type>  insert(const ReadyFunctor& ready_func,const RunFunctor& run_func)  {    return insert(priority_type(),ready_func,run_func);  }  /// @brIEf Insert a method request into the scheduler.  ///  /// @param priority Priority of job.  /// @param run_func Invoked when ready to run.  ///  /// @return future associated with the method.  template <typename RunFunctor>  boost::unique_future<typename boost::result_of<RunFunctor()>::type>  insert(priority_type priority,const RunFunctor& run_func)  {    return insert(priority,&always_ready,run_func);  }  /// @brIEf Insert a method request with default priority into the  ///        scheduler.  ///  /// @param run_func Invoked when ready to run.  ///  /// @param functor Job to run.  ///  /// @return future associated with the job.  template <typename RunFunc>  boost::unique_future<typename boost::result_of<RunFunc()>::type>  insert(const RunFunc& run_func)  {    return insert(&always_ready,run_func);  }  /// @brIEf Cancel all outstanding request.  voID cancel()  {    boost::unique_lock<mutex_type> lock(mutex_);    activation_List_.clear();    request_count_ = 0;  } private:  /// @brIEf dispatch a request.  voID dispatch()  {    // Get the current highest priority request ready to run from the queue.    boost::unique_lock<mutex_type> lock(mutex_);    if (activation_List_.empty()) return;    // Find the highest priority method ready to run.    typedef typename activation_List_type::iterator iterator;    iterator end = activation_List_.end();    iterator result = std::find_if(      activation_List_.begin(),end,&is_method_ready);    // If no methods are ready,then post into dispatch,as the    // method may have become ready.    if (end == result)    {      io_service_.post(boost::bind(&scheduler::dispatch,this));      return;    }    // Take ownership of request.    boost::shared_ptr<method_request> method = result->second;    activation_List_.erase(result);    // Run method without mutex.    lock.unlock();    method->run();        lock.lock();    // Perform bookkeePing.    --request_count_;  }  static bool always_ready() { return true; }private:  /// @brIEf List of outstanding request.  typedef boost::multi_index_container<    pair_type,boost::multi_index::indexed_by<      boost::multi_index::ordered_non_unique<        boost::multi_index::member<pair_type,typename pair_type::first_type,&pair_type::first>,Compare      >    >  > activation_List_type;  activation_List_type activation_List_;  /// @brIEf Thread group managing threads servicing pool.  boost::thread_group threads_;  /// @brIEf io_service used to function as a thread pool.  boost::asio::io_service io_service_;  /// @brIEf Work is used to keep threads servicing io_service.  boost::asio::io_service::work work_;  /// @brIEf Maximum amount of request.  const std::size_t max_request_;  /// @brIEf Count of outstanding request.  std::size_t request_count_;  /// @brIEf Synchronize access to the activation List.  typedef boost::mutex mutex_type;  mutex_type mutex_;};typedef scheduler<unsigned int,std::greater<unsigned int> > high_priority_scheduler;/// @brIEf adder is a simple proxy that will delegate work to///        the scheduler.class adder{public:  adder(high_priority_scheduler& scheduler)    : scheduler_(scheduler)  {}  /// @brIEf Add a and b with a priority.  ///  /// @return Return future result.  template <typename T>  boost::unique_future<T> add(    high_priority_scheduler::priority_type priority,const T& a,const T& b)  {    // Insert method request    return scheduler_.insert(      priority,boost::bind(&adder::do_add<T>,a,b));  }  /// @brIEf Add a and b.  ///  /// @return Return future result.  template <typename T>  boost::unique_future<T> add(const T& a,const T& b)  {    return add(high_priority_scheduler::priority_type(),b);  }private:  /// @brIEf Actual add a and b.  template <typename T>  static T do_add(const T& a,const T& b)  {    std::cout << "Starting addition of '" << a               << "' and '" << b << "'" << std::endl;    // Mimic busy work.    boost::this_thread::sleep_for(boost::chrono::seconds(2));    std::cout << "Finished addition" << std::endl;    return a + b;  }private:  high_priority_scheduler& scheduler_;};bool get(bool& value) { return value; }voID guarded_call(){  std::cout << "guarded_call" << std::endl; }int main(){  const unsigned int max_threads = 1;  const unsigned int max_request = 4;  // Sscheduler  high_priority_scheduler scheduler(max_threads,max_request);  // Proxy  adder adder(scheduler);  // ClIEnt  // Add guarded method to scheduler.  bool ready = false;  std::cout << "Add guarded method." << std::endl;  boost::unique_future<voID> future1 = scheduler.insert(    boost::bind(&get,boost::ref(ready)),&guarded_call);  // Add 1 + 100 with default priority.  boost::unique_future<int> future2 = adder.add(1,100);  // Force sleep to try to get scheduler to run request 2 first.  boost::this_thread::sleep_for(boost::chrono::seconds(1));  // Add:  //   2 + 200 with low priority (5)  //   "test" + "this" with high priority (99)  boost::unique_future<int> future3 = adder.add(5,2,200);  boost::unique_future<std::string> future4 = adder.add(99,std::string("test"),std::string("this"));  // Max request should have been reached,so add another.  boost::unique_future<int> future5 = adder.add(3,300);  // Check if request was added.  std::cout << "future1 is valID: " << future1.valID()          << "\nfuture2 is valID: " << future2.valID()          << "\nfuture3 is valID: " << future3.valID()          << "\nfuture4 is valID: " << future4.valID()          << "\nfuture5 is valID: " << future5.valID()          << std::endl;  // Get results for future2 and future3.  Do nothing with future4's results.  std::cout << "future2 result: " << future2.get()          << "\nfuture3 result: " << future3.get()          << std::endl;  std::cout << "Unguarding method." << std::endl;  ready = true;  future1.wait();}

执行使用1的线程池,最多4个请求.

> request1被保护直到程序结束,应该是最后运行的.
> request2(1 100)以默认优先级插入,应首先运行.
> request3(2 200)被插入低优先级,应该在request4之后运行.
> request4(‘test”this’)以高优先级插入,应在request3之前运行.
> request5由于最大请求而无法插入,不应该是有效的.

输出如下:

Add guarded method.Starting addition of '1' and '100'future1 is valID: 1future2 is valID: 1future3 is valID: 1future4 is valID: 1future5 is valID: 0Finished additionStarting addition of 'test' and 'this'Finished additionStarting addition of '2' and '200'Finished additionfuture2 result: 101future3 result: 202Unguarding method.guarded_call
总结

以上是内存溢出为你收集整理的c – boost :: asio和活动对象全部内容,希望文章能够帮你解决c – boost :: asio和活动对象所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/langs/1245914.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-06-07
下一篇2022-06-07

发表评论

登录后才能评论

评论列表(0条)

    保存