当前,网络服务一般都具有高并发量、高吞吐量的要求,在接收到客户端的socket数据或http请求时,能够及时处理数据并快速响应客户端,从而增强用户的体验性。 为了高效利用服务器的处理能力,尤其是针对多核机器,需要创建多个线程并构成线程池,对线程资源进行统一管理。由于线程处理请求需要一定时间,当新的请求到达时,就可以将新请求分配给其他空闲的线程,因而可充分利用多核特性,达到并行处理的效果。
对此,本文提供一种线程池创建的思路与实现————生产者消费者非阻塞异步处理模型,主要适用于云端业务的处理。

一、首先是通用线程基类的构建,具体的线程类继承该基类,并实现Work虚函数,进行实际的处理流程。

// 线程池及通用生产者消费者非阻塞异步处理模型
//生产者与消费者模型,利用多个工作线程来调度(创建线程池),加快服务对数据的处理
//每个线程对应一个对象,整个线程池由一个指向线程对象的指针表示(指向线程对象数组)
template <typename T>
class Base
{
public:
    void Produce(const T& data);    //向线程对象发送消息
    void Signal(); //发送信号解除阻塞
    bool Run(); //创建并启动工作线程
    void Wait(); //等待线程结束

protected:
    static void* Proc(void *arg);
    virtual void Work() = 0;    //处理新发送而来的消息
    //生产者链表
    list<T> produce_list;
    //消费者链表
    list<T> custome_list;
    //线程锁
    pthread_mutex_t mutex;
    //条件变量
    pthread_cond_t cond;
    
    pthread_t threadId;
};
// 向生产队列推送数据
void Produce(const T& data)
{
    pthread_mutex_lock(&mutex);
    produce_list.push_back(data);
    pthread_cond_signal(&cond); 
    pthread_mutex_unlock(&mutex);
    return;
}

//发送信号解除阻塞 kill -10 时候条调用
template <typename T>
void Base<T>::Unblock()
{
    int i = 0;
    //防止信号放空,间隔100us释放三次信号
    while (++i <= 3)
    {
        pthread_mutex_lock(&mutex);
        pthread_cond_signal(&cond); 
        pthread_mutex_unlock(&mutex);
        usleep(100);
    }
    return;
}

//发送信号解除阻塞, epoll_wait后调用
template <typename T>
void Base<T>::Signal()
{
    pthread_mutex_lock(&mutex);
    pthread_cond_signal(&cond); 
    pthread_mutex_unlock(&mutex);
    return;
}

//启动线程
template <typename T>
bool Base<T>::Consume()
{
    if(0 == pthread_create(&threadId, NULL, Proc, (void *)this))
    {
        return true;
    }
    else
    {
        return false;
    }
}

template <typename T>
void Base<T>::Wait()
{
    Signal();
    pthread_join(threadId, NULL);
    return;
}

template <typename T>
void* Base<T>::Proc(void *arg)
{
    Base<T> *pThis = (Base<T> *)arg;
    pThis->Work();
    return NULL;
}

// Work函数在子类中定义
void Work()
{
    while(run_flag)
    {
        //此大括号是为了让zero_list早点析构,空出内存
        {
            list<T> empty_list; //空表

            pthread_mutex_lock(&mutex);

            //生产者链表不为空直接读,为空则等待生产者信号
            if (produce_list.empty())
            {
                //消费者等待生产者信号
                pthread_cond_wait(&cond, &mutex);
            }

            custome_list.swap(produce_list);
            //用新的空链表置换,清理垃圾内存
            produce_list.swap(empty_list);
            pthread_mutex_unlock(&mutex);
        }

        for (iterator iter = custome_list.begin(); iter != custome_list.end(); ++iter)
        {
            server->Process(iter->first, iter->second, thread_num);//由实际的业务对象来处理数据
        }
    }

    return;
}

二、实际在业务层中创建线程池,就是一个线程对象指针的数组,而均衡分配线程处理的算法可采用轮询或随机。相关代码示例如下:

        WorkThread* WorkPool = new WorkThread[8];    //创建8个线程
        for ( int i = 0; i < 8; i++) {
            (WorkPool+i)->Init();    //初始化线程对象相关成员变量
        }
        
        // 启动所有线程
        for (int i = 0; i < 8; i++) {
            (WorkPool+i)->Consume();
        }
        
        //分配线程来处理任务,轮询
        (WorkPool + (++num%8))->Produce(task);

 

版权声明:本文为share-ideas原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/share-ideas/p/10492376.html