image_framework_ymj/image_framework/thead/LibapiProcessThread.cpp
2024-12-06 16:25:16 +08:00

192 lines
4.7 KiB
C++
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <iostream>
#include <chrono>
#include <mutex>
#include <cassert>
#include <typeinfo>
#include <iostream>
//#include <spdlog/async.h>
#include "LibapiProcessThread.h"
#include "LibapiMsg.h"
using namespace std;
LibapiProcessThread::LibapiProcessThread(int id, std::string name, BaseRunnable* runnable):
LibapiThread(id, name, runnable)
{
}
LibapiProcessThread::~LibapiProcessThread()
{
}
void LibapiProcessThread::set_parent_thread(LibapiProcessThread* pParentThread)
{
pParentThread->add_sub_thread(this);
this->m_parent = pParentThread;
}
void LibapiProcessThread::add_sub_thread(LibapiProcessThread* pSubThread)
{
for (auto it = m_sub_threads.begin(); it < m_sub_threads.end(); it++)
{
if ((*it)->m_id == pSubThread->get_id() || (*it)->m_name == pSubThread->get_name())
{
//spdlog::info(this->get_name() + " 添加重名子线程 " + pSubThread->get_name());
std::cout << this->get_name() << " 添加重名子线程 " << pSubThread->get_name() << std::endl;
assert(0);
}
}
if (pSubThread->m_parent != nullptr)
{
//spdlog::info(this->get_name() + " pSubThread->m_parent != nullptr " + pSubThread->get_name());
std::cout << this->get_name() << " pSubThread->m_parent != nullptr " << pSubThread->get_name() << std::endl;
assert(0);
}
this->m_sub_threads.emplace_back(pSubThread);
Signal sig = { pSubThread->get_id(), false };
this->m_sub_thread_signal.emplace_back(sig);
pSubThread->m_parent = this;
}
void LibapiProcessThread::set_next_thread(LibapiProcessThread* pNextThread)
{
m_next_threads.emplace_back(pNextThread);
//m_next_thread = pNextThread;
}
void LibapiProcessThread::reset_sub_thread_signal(std::vector<Signal>& sub_thread_signal)
{
std::unique_lock<std::mutex> lock(m_sig_mutex);
for (auto it = sub_thread_signal.begin(); it < sub_thread_signal.end(); it++)
{
(*it).end = false;
}
}
void LibapiProcessThread::update_sub_thread_signal(LibapiProcessThread* pThread)
{
std::unique_lock<std::mutex> lock(m_sig_mutex);
for (auto it = m_sub_thread_signal.begin(); it < m_sub_thread_signal.end(); it++)
{
if ((*it).id == pThread->get_id())
{
(*it).end = true;
return;
}
}
}
bool LibapiProcessThread::check_all_sub_complete()
{
std::unique_lock<std::mutex> lock(m_sig_mutex);
for (auto it = m_sub_thread_signal.begin(); it < m_sub_thread_signal.end(); it++)
{
if ((*it).end == false)
{
return false;
}
}
return true;
}
// override function
void LibapiProcessThread::run()
{
int ret = 0;
while (!m_stopFlag)
{
if (m_pauseFlag)
{
unique_lock<mutex> locker(m_mutex);
while (m_pauseFlag)
{
m_condition.wait(locker);
}
locker.unlock();
}
void* pMsg = NULL;
m_queue->pop(pMsg);
if (m_prunnable != NULL && pMsg != NULL)
{
m_prunnable->get_start_time();
ret = m_prunnable->OnProcess(this, pMsg);
//处理完如果pMsg为null则不再处理了
if (ret == -1) continue;
}
reset_sub_thread_signal(m_sub_thread_signal);
if (m_sub_threads.size() > 0)
{
bool succ = false;
for (auto it = m_sub_threads.begin(); it < m_sub_threads.end(); it++)
{
LibapiThread* subThread = (LibapiThread*)(*it);
((LibapiMsg*)pMsg)->add_ref();
succ = subThread->push(pMsg);
// 如果push成功加引用
if (!succ)
{
((LibapiMsg*)pMsg)->sub_ref();
update_sub_thread_signal((LibapiProcessThread*)subThread);
}
}
if (!check_all_sub_complete())
{
unique_lock<mutex> locker(m_mutex);
while (!check_all_sub_complete())
{
m_condition.wait(locker);
// LibapiThread::delay_second(0.001);
}
locker.unlock();
}
}
if (m_prunnable != NULL && pMsg != NULL)
{
m_prunnable->OnEndProcess(this, pMsg);
}
int next_thread_size = m_next_threads.size();
if (next_thread_size == 1)
{
((LibapiMsg*)pMsg)->add_ref();
int succ = m_next_threads[0]->push(pMsg);
if (!succ)
((LibapiMsg*)pMsg)->sub_ref();
}
else if (next_thread_size > 1)
{
((LibapiMsg*)pMsg)->add_ref();
int succ = m_next_threads[m_cur_index % next_thread_size]->push(pMsg);
// 如果push不成功减引用
if (!succ)
((LibapiMsg*)pMsg)->sub_ref();
m_cur_index++;
if (m_cur_index == next_thread_size) m_cur_index = 0;
}
if (m_parent != NULL)
{
m_parent->update_sub_thread_signal(this);
m_parent->notify();
}
// 消息处理完成,减引用
int ref = ((LibapiMsg*)pMsg)->sub_ref();
if (ref==0) ((LibapiMsg*)pMsg)->delete_msg();
}
m_pauseFlag = false;
m_stopFlag = false;
//spdlog::info("exit thread:%d", get_id());
}