image_framework_ymj/image_framework/thead/LibapiProcessThread.cpp

192 lines
4.7 KiB
C++
Raw Normal View History

2024-12-06 16:25:16 +08:00
#include <iostream>
#include <chrono>
#include <mutex>
#include <cassert>
#include <typeinfo>
#include <iostream>
//#include <spdlog/async.h>
#include "LibapiProcessThread.h"
#include "LibapiMsg.h"
using namespace std;
LibapiProcessThread::LibapiProcessThread(int id, std::string name, BaseRunnable* runnable):
LibapiThread(id, name, runnable)
{
}
LibapiProcessThread::~LibapiProcessThread()
{
}
void LibapiProcessThread::set_parent_thread(LibapiProcessThread* pParentThread)
{
pParentThread->add_sub_thread(this);
this->m_parent = pParentThread;
}
void LibapiProcessThread::add_sub_thread(LibapiProcessThread* pSubThread)
{
for (auto it = m_sub_threads.begin(); it < m_sub_threads.end(); it++)
{
if ((*it)->m_id == pSubThread->get_id() || (*it)->m_name == pSubThread->get_name())
{
//spdlog::info(this->get_name() + " 添加重名子线程 " + pSubThread->get_name());
std::cout << this->get_name() << " 添加重名子线程 " << pSubThread->get_name() << std::endl;
assert(0);
}
}
if (pSubThread->m_parent != nullptr)
{
//spdlog::info(this->get_name() + " pSubThread->m_parent != nullptr " + pSubThread->get_name());
std::cout << this->get_name() << " pSubThread->m_parent != nullptr " << pSubThread->get_name() << std::endl;
assert(0);
}
this->m_sub_threads.emplace_back(pSubThread);
Signal sig = { pSubThread->get_id(), false };
this->m_sub_thread_signal.emplace_back(sig);
pSubThread->m_parent = this;
}
void LibapiProcessThread::set_next_thread(LibapiProcessThread* pNextThread)
{
m_next_threads.emplace_back(pNextThread);
//m_next_thread = pNextThread;
}
void LibapiProcessThread::reset_sub_thread_signal(std::vector<Signal>& sub_thread_signal)
{
std::unique_lock<std::mutex> lock(m_sig_mutex);
for (auto it = sub_thread_signal.begin(); it < sub_thread_signal.end(); it++)
{
(*it).end = false;
}
}
void LibapiProcessThread::update_sub_thread_signal(LibapiProcessThread* pThread)
{
std::unique_lock<std::mutex> lock(m_sig_mutex);
for (auto it = m_sub_thread_signal.begin(); it < m_sub_thread_signal.end(); it++)
{
if ((*it).id == pThread->get_id())
{
(*it).end = true;
return;
}
}
}
bool LibapiProcessThread::check_all_sub_complete()
{
std::unique_lock<std::mutex> lock(m_sig_mutex);
for (auto it = m_sub_thread_signal.begin(); it < m_sub_thread_signal.end(); it++)
{
if ((*it).end == false)
{
return false;
}
}
return true;
}
// override function
void LibapiProcessThread::run()
{
int ret = 0;
while (!m_stopFlag)
{
if (m_pauseFlag)
{
unique_lock<mutex> locker(m_mutex);
while (m_pauseFlag)
{
m_condition.wait(locker);
}
locker.unlock();
}
void* pMsg = NULL;
m_queue->pop(pMsg);
if (m_prunnable != NULL && pMsg != NULL)
{
m_prunnable->get_start_time();
ret = m_prunnable->OnProcess(this, pMsg);
//处理完如果pMsg为null则不再处理了
if (ret == -1) continue;
}
reset_sub_thread_signal(m_sub_thread_signal);
if (m_sub_threads.size() > 0)
{
bool succ = false;
for (auto it = m_sub_threads.begin(); it < m_sub_threads.end(); it++)
{
LibapiThread* subThread = (LibapiThread*)(*it);
((LibapiMsg*)pMsg)->add_ref();
succ = subThread->push(pMsg);
// 如果push成功加引用
if (!succ)
{
((LibapiMsg*)pMsg)->sub_ref();
update_sub_thread_signal((LibapiProcessThread*)subThread);
}
}
if (!check_all_sub_complete())
{
unique_lock<mutex> locker(m_mutex);
while (!check_all_sub_complete())
{
m_condition.wait(locker);
// LibapiThread::delay_second(0.001);
}
locker.unlock();
}
}
if (m_prunnable != NULL && pMsg != NULL)
{
m_prunnable->OnEndProcess(this, pMsg);
}
int next_thread_size = m_next_threads.size();
if (next_thread_size == 1)
{
((LibapiMsg*)pMsg)->add_ref();
int succ = m_next_threads[0]->push(pMsg);
if (!succ)
((LibapiMsg*)pMsg)->sub_ref();
}
else if (next_thread_size > 1)
{
((LibapiMsg*)pMsg)->add_ref();
int succ = m_next_threads[m_cur_index % next_thread_size]->push(pMsg);
// 如果push不成功减引用
if (!succ)
((LibapiMsg*)pMsg)->sub_ref();
m_cur_index++;
if (m_cur_index == next_thread_size) m_cur_index = 0;
}
if (m_parent != NULL)
{
m_parent->update_sub_thread_signal(this);
m_parent->notify();
}
// 消息处理完成,减引用
int ref = ((LibapiMsg*)pMsg)->sub_ref();
if (ref==0) ((LibapiMsg*)pMsg)->delete_msg();
}
m_pauseFlag = false;
m_stopFlag = false;
//spdlog::info("exit thread:%d", get_id());
}