线程池的实现思路与代码
当前,网络服务一般都具有高并发量、高吞吐量的要求,在接收到客户端的socket数据或http请求时,能够及时处理数据并快速响应客户端,从而增强用户的体验性。 为了高效利用服务器的处理能力,尤其是针对多核机器,需要创建多个线程并构成线程池,对线程资源进行统一管理。由于线程处理请求需要一定时间,当新的请求到达时,就可以将新请求分配给其他空闲的线程,因而可充分利用多核特性,达到并行处理的效果。
对此,本文提供一种线程池创建的思路与实现————生产者消费者非阻塞异步处理模型,主要适用于云端业务的处理。
一、首先是通用线程基类的构建,具体的线程类继承该基类,并实现Work虚函数,进行实际的处理流程。
// 线程池及通用生产者消费者非阻塞异步处理模型 //生产者与消费者模型,利用多个工作线程来调度(创建线程池),加快服务对数据的处理 //每个线程对应一个对象,整个线程池由一个指向线程对象的指针表示(指向线程对象数组) template <typename T> class Base { public: void Produce(const T& data); //向线程对象发送消息 void Signal(); //发送信号解除阻塞 bool Run(); //创建并启动工作线程 void Wait(); //等待线程结束 protected: static void* Proc(void *arg); virtual void Work() = 0; //处理新发送而来的消息 //生产者链表 list<T> produce_list; //消费者链表 list<T> custome_list; //线程锁 pthread_mutex_t mutex; //条件变量 pthread_cond_t cond; pthread_t threadId; }; // 向生产队列推送数据 void Produce(const T& data) { pthread_mutex_lock(&mutex); produce_list.push_back(data); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); return; } //发送信号解除阻塞 kill -10 时候条调用 template <typename T> void Base<T>::Unblock() { int i = 0; //防止信号放空,间隔100us释放三次信号 while (++i <= 3) { pthread_mutex_lock(&mutex); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); usleep(100); } return; } //发送信号解除阻塞, epoll_wait后调用 template <typename T> void Base<T>::Signal() { pthread_mutex_lock(&mutex); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); return; } //启动线程 template <typename T> bool Base<T>::Consume() { if(0 == pthread_create(&threadId, NULL, Proc, (void *)this)) { return true; } else { return false; } } template <typename T> void Base<T>::Wait() { Signal(); pthread_join(threadId, NULL); return; } template <typename T> void* Base<T>::Proc(void *arg) { Base<T> *pThis = (Base<T> *)arg; pThis->Work(); return NULL; } // Work函数在子类中定义 void Work() { while(run_flag) { //此大括号是为了让zero_list早点析构,空出内存 { list<T> empty_list; //空表 pthread_mutex_lock(&mutex); //生产者链表不为空直接读,为空则等待生产者信号 if (produce_list.empty()) { //消费者等待生产者信号 pthread_cond_wait(&cond, &mutex); } custome_list.swap(produce_list); //用新的空链表置换,清理垃圾内存 produce_list.swap(empty_list); pthread_mutex_unlock(&mutex); } for (iterator iter = custome_list.begin(); iter != custome_list.end(); ++iter) { server->Process(iter->first, iter->second, thread_num);//由实际的业务对象来处理数据 } } return; }
二、实际在业务层中创建线程池,就是一个线程对象指针的数组,而均衡分配线程处理的算法可采用轮询或随机。相关代码示例如下:
WorkThread* WorkPool = new WorkThread[8]; //创建8个线程 for ( int i = 0; i < 8; i++) { (WorkPool+i)->Init(); //初始化线程对象相关成员变量 } // 启动所有线程 for (int i = 0; i < 8; i++) { (WorkPool+i)->Consume(); } //分配线程来处理任务,轮询 (WorkPool + (++num%8))->Produce(task);