#include "threadpool.hpp" #include #include //#include #include static const int MAX_THREADS = 10000; //最大线程数目 /** * @brief ThreadPool * @param number[in]线程数 *默认开一个线程 * @param * emptyQuit[in]空线程退出,默认false。如果为true,需要先添加任务,再start,然后waite完成 */ ThreadPool::ThreadPool(int number, bool emptyQuit) : m_StopFlag(false), m_EmptyQuit(emptyQuit), m_JoinFlag(false), m_QuitNum(0), m_EmptyQuitWaite(false) { std::cout << "线程池中线程数:" << number << std::endl; if (number <= 0 || number > MAX_THREADS) throw std::exception(); m_ThreadNum = number; } ThreadPool::~ThreadPool() { // std::cout << "~ThreadPool()" << std::endl; stop(); } /** * @brief stop 停止 */ void ThreadPool::stop() { //保证多线程情况下只调用一次stopThreadGroup std::call_once(m_CallStopSlag, [this] { stopThreadGroup(); }); } /** * @brief stopThreadGroup 停止线程组 */ void ThreadPool::stopThreadGroup() { m_StopFlag = true; Sleep(500); m_Condition.notify_all(); waiteFinish(); //等待线程退出 std::thread* thread = NULL; for (int i = 0; i < m_WorkThreads.size(); i++) { thread = m_WorkThreads[i]; if (thread != NULL) { thread->join(); delete thread; thread = NULL; } m_WorkThreads[i] = NULL; } m_WorkThreads.clear(); } /** * @brief startThread 启动线程 */ void ThreadPool::startThread() { for (int i = 0; i < m_ThreadNum; i++) { std::thread* thread = new std::thread(ThreadPool::worker, this); m_WorkThreads.push_back(thread); } } /** * @brief waiteThreadFinish 等待线程结束 */ void ThreadPool::waiteThreadFinish() { if (m_JoinFlag) return; if (m_EmptyQuit) { m_EmptyQuitWaite = true; do { if (m_ThreadNum == m_QuitNum) break; Sleep(400); } while (true); m_StopFlag = true; m_Condition.notify_all(); } /* for (int i = 0; i < work_threads.size(); i++) { if (work_threads[i]) { work_threads[i]->join(); } }*/ m_JoinFlag = true; } /** * @brief start 启动 */ void ThreadPool::start() { std::call_once(m_CallStartSlag, [this] { startThread(); }); } /** * @brief waiteFinish 等待所有任务结束,设置为任务为空退出时调用 */ void ThreadPool::waiteFinish() { std::call_once(m_CallWaiteFinisFlag, [this] { waiteThreadFinish(); }); } /** * @brief 任务数 */ int ThreadPool::taskNum() { return m_TasksQueue.size(); } /** * @brief append 往请求队列<task_queue>中添加任务 * @param task * @return */ bool ThreadPool::append(Task task) { /*操作工作队列时一定要加锁,因为他被所有线程共享*/ m_DataMutex.lock(); m_TasksQueue.push(task); m_DataMutex.unlock(); m_Condition.notify_one(); //线程池添加进去了任务,自然要通知等待的线程 return true; } /** * @brief worker 线程回调函数 * @param arg * @return */ void* ThreadPool::worker(void* arg) { ThreadPool* pool = (ThreadPool*)arg; pool->run(); return pool; } /** * @brief notEmpty 是否空 * @return */ bool ThreadPool::notEmpty() { bool empty = m_TasksQueue.empty(); if (empty) { // std::ostringstream oss; // oss << std::this_thread::get_id(); // printf("queue empty thread id %s waite...!\n", oss.str().c_str()); } return !empty; } /** * @brief run 工作线程需要运行的函数,不断的从任务队列中取出并执行 */ void ThreadPool::run() { bool flag = false; int remainder = 0; while (!m_StopFlag) { flag = false; { std::unique_lock lk(this->m_QueueMutex); /* unique_lock() 出作用域会自动解锁 */ m_Condition.wait(lk, [this] { return m_StopFlag || notEmpty(); }); } if (m_StopFlag) break; Task task; m_DataMutex.lock(); //如果任务队列不为空,就停下来等待唤醒 if (!this->m_TasksQueue.empty()) { task = m_TasksQueue.front(); m_TasksQueue.pop(); remainder = m_TasksQueue.size(); flag = true; } m_DataMutex.unlock(); if (flag) task(); //如果队列为空并且完成退出,已经开始等待退出就退出 if (m_TasksQueue.empty() && m_EmptyQuit && m_EmptyQuitWaite) { break; } } m_QuitNum += 1; std::ostringstream oss; oss << std::this_thread::get_id(); printf("thread %s end\n", oss.str().c_str()); }