192 lines
4.7 KiB
C++
Executable File
192 lines
4.7 KiB
C++
Executable File
#include <iostream>
|
||
#include <chrono>
|
||
#include <mutex>
|
||
#include <cassert>
|
||
#include <typeinfo>
|
||
#include <iostream>
|
||
//#include <spdlog/async.h>
|
||
#include "LibapiProcessThread.h"
|
||
#include "LibapiMsg.h"
|
||
|
||
using namespace std;
|
||
|
||
LibapiProcessThread::LibapiProcessThread(int id, std::string name, BaseRunnable* runnable):
|
||
LibapiThread(id, name, runnable)
|
||
{
|
||
}
|
||
|
||
LibapiProcessThread::~LibapiProcessThread()
|
||
{
|
||
|
||
}
|
||
|
||
void LibapiProcessThread::set_parent_thread(LibapiProcessThread* pParentThread)
|
||
{
|
||
pParentThread->add_sub_thread(this);
|
||
this->m_parent = pParentThread;
|
||
}
|
||
|
||
void LibapiProcessThread::add_sub_thread(LibapiProcessThread* pSubThread)
|
||
{
|
||
for (auto it = m_sub_threads.begin(); it < m_sub_threads.end(); it++)
|
||
{
|
||
if ((*it)->m_id == pSubThread->get_id() || (*it)->m_name == pSubThread->get_name())
|
||
{
|
||
//spdlog::info(this->get_name() + " 添加重名子线程 " + pSubThread->get_name());
|
||
std::cout << this->get_name() << " 添加重名子线程 " << pSubThread->get_name() << std::endl;
|
||
assert(0);
|
||
}
|
||
}
|
||
if (pSubThread->m_parent != nullptr)
|
||
{
|
||
//spdlog::info(this->get_name() + " pSubThread->m_parent != nullptr " + pSubThread->get_name());
|
||
std::cout << this->get_name() << " pSubThread->m_parent != nullptr " << pSubThread->get_name() << std::endl;
|
||
assert(0);
|
||
}
|
||
|
||
this->m_sub_threads.emplace_back(pSubThread);
|
||
Signal sig = { pSubThread->get_id(), false };
|
||
this->m_sub_thread_signal.emplace_back(sig);
|
||
pSubThread->m_parent = this;
|
||
}
|
||
|
||
void LibapiProcessThread::set_next_thread(LibapiProcessThread* pNextThread)
|
||
{
|
||
m_next_threads.emplace_back(pNextThread);
|
||
//m_next_thread = pNextThread;
|
||
}
|
||
|
||
void LibapiProcessThread::reset_sub_thread_signal(std::vector<Signal>& sub_thread_signal)
|
||
{
|
||
std::unique_lock<std::mutex> lock(m_sig_mutex);
|
||
|
||
for (auto it = sub_thread_signal.begin(); it < sub_thread_signal.end(); it++)
|
||
{
|
||
(*it).end = false;
|
||
}
|
||
}
|
||
|
||
void LibapiProcessThread::update_sub_thread_signal(LibapiProcessThread* pThread)
|
||
{
|
||
std::unique_lock<std::mutex> lock(m_sig_mutex);
|
||
|
||
for (auto it = m_sub_thread_signal.begin(); it < m_sub_thread_signal.end(); it++)
|
||
{
|
||
if ((*it).id == pThread->get_id())
|
||
{
|
||
(*it).end = true;
|
||
return;
|
||
}
|
||
}
|
||
}
|
||
|
||
bool LibapiProcessThread::check_all_sub_complete()
|
||
{
|
||
std::unique_lock<std::mutex> lock(m_sig_mutex);
|
||
|
||
for (auto it = m_sub_thread_signal.begin(); it < m_sub_thread_signal.end(); it++)
|
||
{
|
||
if ((*it).end == false)
|
||
{
|
||
return false;
|
||
}
|
||
}
|
||
return true;
|
||
}
|
||
|
||
// override function
|
||
void LibapiProcessThread::run()
|
||
{
|
||
int ret = 0;
|
||
while (!m_stopFlag)
|
||
{
|
||
if (m_pauseFlag)
|
||
{
|
||
unique_lock<mutex> locker(m_mutex);
|
||
while (m_pauseFlag)
|
||
{
|
||
m_condition.wait(locker);
|
||
}
|
||
locker.unlock();
|
||
}
|
||
|
||
void* pMsg = NULL;
|
||
m_queue->pop(pMsg);
|
||
|
||
if (m_prunnable != NULL && pMsg != NULL)
|
||
{
|
||
m_prunnable->get_start_time();
|
||
ret = m_prunnable->OnProcess(this, pMsg);
|
||
//处理完,如果pMsg为null,则不再处理了
|
||
if (ret == -1) continue;
|
||
}
|
||
|
||
reset_sub_thread_signal(m_sub_thread_signal);
|
||
if (m_sub_threads.size() > 0)
|
||
{
|
||
bool succ = false;
|
||
for (auto it = m_sub_threads.begin(); it < m_sub_threads.end(); it++)
|
||
{
|
||
LibapiThread* subThread = (LibapiThread*)(*it);
|
||
((LibapiMsg*)pMsg)->add_ref();
|
||
succ = subThread->push(pMsg);
|
||
// 如果push成功,加引用
|
||
if (!succ)
|
||
{
|
||
((LibapiMsg*)pMsg)->sub_ref();
|
||
update_sub_thread_signal((LibapiProcessThread*)subThread);
|
||
}
|
||
}
|
||
|
||
if (!check_all_sub_complete())
|
||
{
|
||
unique_lock<mutex> locker(m_mutex);
|
||
while (!check_all_sub_complete())
|
||
{
|
||
m_condition.wait(locker);
|
||
// LibapiThread::delay_second(0.001);
|
||
}
|
||
locker.unlock();
|
||
}
|
||
}
|
||
|
||
if (m_prunnable != NULL && pMsg != NULL)
|
||
{
|
||
m_prunnable->OnEndProcess(this, pMsg);
|
||
}
|
||
|
||
int next_thread_size = m_next_threads.size();
|
||
if (next_thread_size == 1)
|
||
{
|
||
((LibapiMsg*)pMsg)->add_ref();
|
||
int succ = m_next_threads[0]->push(pMsg);
|
||
if (!succ)
|
||
((LibapiMsg*)pMsg)->sub_ref();
|
||
}
|
||
else if (next_thread_size > 1)
|
||
{
|
||
((LibapiMsg*)pMsg)->add_ref();
|
||
int succ = m_next_threads[m_cur_index % next_thread_size]->push(pMsg);
|
||
// 如果push不成功,减引用
|
||
if (!succ)
|
||
((LibapiMsg*)pMsg)->sub_ref();
|
||
m_cur_index++;
|
||
if (m_cur_index == next_thread_size) m_cur_index = 0;
|
||
}
|
||
|
||
if (m_parent != NULL)
|
||
{
|
||
m_parent->update_sub_thread_signal(this);
|
||
m_parent->notify();
|
||
}
|
||
|
||
// 消息处理完成,减引用
|
||
int ref = ((LibapiMsg*)pMsg)->sub_ref();
|
||
if (ref==0) ((LibapiMsg*)pMsg)->delete_msg();
|
||
}
|
||
|
||
m_pauseFlag = false;
|
||
m_stopFlag = false;
|
||
|
||
//spdlog::info("exit thread:%d", get_id());
|
||
} |