#include #include #include #include #include #include //#include #include "LibapiProcessThread.h" #include "LibapiMsg.h" using namespace std; LibapiProcessThread::LibapiProcessThread(int id, std::string name, BaseRunnable* runnable): LibapiThread(id, name, runnable) { } LibapiProcessThread::~LibapiProcessThread() { } void LibapiProcessThread::set_parent_thread(LibapiProcessThread* pParentThread) { pParentThread->add_sub_thread(this); this->m_parent = pParentThread; } void LibapiProcessThread::add_sub_thread(LibapiProcessThread* pSubThread) { for (auto it = m_sub_threads.begin(); it < m_sub_threads.end(); it++) { if ((*it)->m_id == pSubThread->get_id() || (*it)->m_name == pSubThread->get_name()) { //spdlog::info(this->get_name() + " 添加重名子线程 " + pSubThread->get_name()); std::cout << this->get_name() << " 添加重名子线程 " << pSubThread->get_name() << std::endl; assert(0); } } if (pSubThread->m_parent != nullptr) { //spdlog::info(this->get_name() + " pSubThread->m_parent != nullptr " + pSubThread->get_name()); std::cout << this->get_name() << " pSubThread->m_parent != nullptr " << pSubThread->get_name() << std::endl; assert(0); } this->m_sub_threads.emplace_back(pSubThread); Signal sig = { pSubThread->get_id(), false }; this->m_sub_thread_signal.emplace_back(sig); pSubThread->m_parent = this; } void LibapiProcessThread::set_next_thread(LibapiProcessThread* pNextThread) { m_next_threads.emplace_back(pNextThread); //m_next_thread = pNextThread; } void LibapiProcessThread::reset_sub_thread_signal(std::vector& sub_thread_signal) { std::unique_lock lock(m_sig_mutex); for (auto it = sub_thread_signal.begin(); it < sub_thread_signal.end(); it++) { (*it).end = false; } } void LibapiProcessThread::update_sub_thread_signal(LibapiProcessThread* pThread) { std::unique_lock lock(m_sig_mutex); for (auto it = m_sub_thread_signal.begin(); it < m_sub_thread_signal.end(); it++) { if ((*it).id == pThread->get_id()) { (*it).end = true; return; } } } bool LibapiProcessThread::check_all_sub_complete() { std::unique_lock lock(m_sig_mutex); for (auto it = m_sub_thread_signal.begin(); it < m_sub_thread_signal.end(); it++) { if ((*it).end == false) { return false; } } return true; } // override function void LibapiProcessThread::run() { int ret = 0; while (!m_stopFlag) { if (m_pauseFlag) { unique_lock locker(m_mutex); while (m_pauseFlag) { m_condition.wait(locker); } locker.unlock(); } void* pMsg = NULL; m_queue->pop(pMsg); if (m_prunnable != NULL && pMsg != NULL) { m_prunnable->get_start_time(); ret = m_prunnable->OnProcess(this, pMsg); //处理完,如果pMsg为null,则不再处理了 if (ret == -1) continue; } reset_sub_thread_signal(m_sub_thread_signal); if (m_sub_threads.size() > 0) { bool succ = false; for (auto it = m_sub_threads.begin(); it < m_sub_threads.end(); it++) { LibapiThread* subThread = (LibapiThread*)(*it); ((LibapiMsg*)pMsg)->add_ref(); succ = subThread->push(pMsg); // 如果push成功,加引用 if (!succ) { ((LibapiMsg*)pMsg)->sub_ref(); update_sub_thread_signal((LibapiProcessThread*)subThread); } } if (!check_all_sub_complete()) { unique_lock locker(m_mutex); while (!check_all_sub_complete()) { m_condition.wait(locker); // LibapiThread::delay_second(0.001); } locker.unlock(); } } if (m_prunnable != NULL && pMsg != NULL) { m_prunnable->OnEndProcess(this, pMsg); } int next_thread_size = m_next_threads.size(); if (next_thread_size == 1) { ((LibapiMsg*)pMsg)->add_ref(); int succ = m_next_threads[0]->push(pMsg); if (!succ) ((LibapiMsg*)pMsg)->sub_ref(); } else if (next_thread_size > 1) { ((LibapiMsg*)pMsg)->add_ref(); int succ = m_next_threads[m_cur_index % next_thread_size]->push(pMsg); // 如果push不成功,减引用 if (!succ) ((LibapiMsg*)pMsg)->sub_ref(); m_cur_index++; if (m_cur_index == next_thread_size) m_cur_index = 0; } if (m_parent != NULL) { m_parent->update_sub_thread_signal(this); m_parent->notify(); } // 消息处理完成,减引用 int ref = ((LibapiMsg*)pMsg)->sub_ref(); if (ref==0) ((LibapiMsg*)pMsg)->delete_msg(); } m_pauseFlag = false; m_stopFlag = false; //spdlog::info("exit thread:%d", get_id()); }