SIMOrthoProgram-Orth_GF3-Strip/PSTM_simulation_windows2021.../PSTM_simulation_windows/threadpool.cpp

183 lines
4.7 KiB
C++
Raw Permalink Blame History

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "threadpool.hpp"
#include <sstream>
#include <string>
//#include <unistd.h>
#include <windows.h>
static const int MAX_THREADS = 10000; //最大线程数目
/**
* @brief ThreadPool
* @param number[in]线程数 *默认开一个线程
* @param
* emptyQuit[in]空线程退出默认false。如果为true需要先添加任务再start然后waite完成
*/
ThreadPool::ThreadPool(int number, bool emptyQuit)
: m_StopFlag(false), m_EmptyQuit(emptyQuit), m_JoinFlag(false), m_QuitNum(0), m_EmptyQuitWaite(false) {
std::cout << "线程池中线程数:" << number << std::endl;
if (number <= 0 || number > MAX_THREADS) throw std::exception();
m_ThreadNum = number;
}
ThreadPool::~ThreadPool() {
// std::cout << "~ThreadPool()" << std::endl;
stop();
}
/**
* @brief stop 停止
*/
void ThreadPool::stop() {
//保证多线程情况下只调用一次stopThreadGroup
std::call_once(m_CallStopSlag, [this] { stopThreadGroup(); });
}
/**
* @brief stopThreadGroup 停止线程组
*/
void ThreadPool::stopThreadGroup() {
m_StopFlag = true;
Sleep(500);
m_Condition.notify_all();
waiteFinish(); //等待线程退出
std::thread* thread = NULL;
for (int i = 0; i < m_WorkThreads.size(); i++)
{
thread = m_WorkThreads[i];
if (thread != NULL)
{
thread->join();
delete thread;
thread = NULL;
}
m_WorkThreads[i] = NULL;
}
m_WorkThreads.clear();
}
/**
* @brief startThread 启动线程
*/
void ThreadPool::startThread() {
for (int i = 0; i < m_ThreadNum; i++) {
std::thread* thread = new std::thread(ThreadPool::worker, this);
m_WorkThreads.push_back(thread);
}
}
/**
* @brief waiteThreadFinish 等待线程结束
*/
void ThreadPool::waiteThreadFinish() {
if (m_JoinFlag) return;
if (m_EmptyQuit)
{
m_EmptyQuitWaite = true;
do
{
if (m_ThreadNum == m_QuitNum)
break;
Sleep(400);
} while (true);
m_StopFlag = true;
m_Condition.notify_all();
}
/* for (int i = 0; i < work_threads.size(); i++) {
if (work_threads[i]) { work_threads[i]->join(); }
}*/
m_JoinFlag = true;
}
/**
* @brief start 启动
*/
void ThreadPool::start() {
std::call_once(m_CallStartSlag, [this] { startThread(); });
}
/**
* @brief waiteFinish 等待所有任务结束,设置为任务为空退出时调用
*/
void ThreadPool::waiteFinish() {
std::call_once(m_CallWaiteFinisFlag, [this] { waiteThreadFinish(); });
}
/**
* @brief 任务数
*/
int ThreadPool::taskNum()
{
return m_TasksQueue.size();
}
/**
* @brief append 往请求队列task_queue中添加任务<T *>
* @param task
* @return
*/
bool ThreadPool::append(Task task) {
/*操作工作队列时一定要加锁,因为他被所有线程共享*/
m_DataMutex.lock();
m_TasksQueue.push(task);
m_DataMutex.unlock();
m_Condition.notify_one(); //线程池添加进去了任务,自然要通知等待的线程
return true;
}
/**
* @brief worker 线程回调函数
* @param arg
* @return
*/
void* ThreadPool::worker(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
pool->run();
return pool;
}
/**
* @brief notEmpty 是否空
* @return
*/
bool ThreadPool::notEmpty() {
bool empty = m_TasksQueue.empty();
if (empty) {
// std::ostringstream oss;
// oss << std::this_thread::get_id();
// printf("queue empty thread id %s waite...!\n", oss.str().c_str());
}
return !empty;
}
/**
* @brief run 工作线程需要运行的函数,不断的从任务队列中取出并执行
*/
void ThreadPool::run() {
bool flag = false;
int remainder = 0;
while (!m_StopFlag) {
flag = false;
{
std::unique_lock<std::mutex> lk(this->m_QueueMutex);
/* unique_lock() 出作用域会自动解锁 */
m_Condition.wait(lk, [this] { return m_StopFlag || notEmpty(); });
}
if (m_StopFlag) break;
Task task;
m_DataMutex.lock();
//如果任务队列不为空,就停下来等待唤醒
if (!this->m_TasksQueue.empty()) {
task = m_TasksQueue.front();
m_TasksQueue.pop();
remainder = m_TasksQueue.size();
flag = true;
}
m_DataMutex.unlock();
if (flag) task();
//如果队列为空并且完成退出,已经开始等待退出就退出
if (m_TasksQueue.empty() && m_EmptyQuit && m_EmptyQuitWaite)
{
break;
}
}
m_QuitNum += 1;
std::ostringstream oss;
oss << std::this_thread::get_id();
printf("thread %s end\n", oss.str().c_str());
}