1 #ifndef OPENPOSE_THREAD_THREAD_MANAGER_HPP
2 #define OPENPOSE_THREAD_THREAD_MANAGER_HPP
15 template<
typename TDatums,
typename TWorker = std::shared_ptr<Worker<TDatums>>,
typename TQueue = Queue<TDatums>>
35 void add(
const unsigned long long threadId,
const std::vector<TWorker>& tWorkers,
36 const unsigned long long queueInId,
const unsigned long long queueOutId);
38 void add(
const unsigned long long threadId,
const TWorker& tWorker,
const unsigned long long queueInId,
39 const unsigned long long queueOutId);
63 bool tryPush(
const TDatums& tDatums);
67 bool tryPop(TDatums& tDatums);
73 std::shared_ptr<std::atomic<bool>> spIsRunning;
74 long long mDefaultMaxSizeQueues;
75 std::multiset<std::tuple<unsigned long long, std::vector<TWorker>,
unsigned long long,
unsigned long long>> mThreadWorkerQueues;
76 std::vector<std::shared_ptr<Thread<TDatums, TWorker>>> mThreads;
77 std::vector<std::shared_ptr<TQueue>> mTQueues;
79 void add(
const std::vector<std::tuple<
unsigned long long, std::vector<TWorker>,
unsigned long long,
unsigned long long>>& threadWorkerQueues);
81 void add(
const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long, unsigned long long>>& threadWorkerQueues);
83 void multisetToThreads();
85 void checkAndCreateEmptyThreads();
87 void checkAndCreateQueues();
107 template<
typename TDatums,
typename TWorker,
typename TQueue>
109 mThreadManagerMode{threadManagerMode},
110 spIsRunning{std::make_shared<std::atomic<bool>>(false)},
111 mDefaultMaxSizeQueues{-1ll}
115 template<
typename TDatums,
typename TWorker,
typename TQueue>
120 template<
typename TDatums,
typename TWorker,
typename TQueue>
125 mDefaultMaxSizeQueues = {defaultMaxSizeQueues};
127 catch (
const std::exception& e)
129 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
133 template<
typename TDatums,
typename TWorker,
typename TQueue>
135 const std::vector<TWorker>& tWorkers,
136 const unsigned long long queueInId,
137 const unsigned long long queueOutId)
141 add({std::make_tuple(threadId, tWorkers, queueInId, queueOutId)});
143 catch (
const std::exception& e)
145 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
149 template<
typename TDatums,
typename TWorker,
typename TQueue>
151 const TWorker& tWorker,
152 const unsigned long long queueInId,
153 const unsigned long long queueOutId)
157 add({std::make_tuple(threadId, std::vector<TWorker>{tWorker}, queueInId, queueOutId)});
159 catch (
const std::exception& e)
161 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
165 template<
typename TDatums,
typename TWorker,
typename TQueue>
170 mThreadWorkerQueues.clear();
174 catch (
const std::exception& e)
176 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
180 template<
typename TDatums,
typename TWorker,
typename TQueue>
188 if (!mThreads.empty())
192 for (
auto i = 0u; i < mThreads.size() - 1; i++)
193 mThreads.at(i)->startInThread();
194 (*mThreads.rbegin())->exec(spIsRunning);
200 catch (
const std::exception& e)
202 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
206 template<
typename TDatums,
typename TWorker,
typename TQueue>
215 for (
auto& thread : mThreads)
216 thread->startInThread();
219 catch (
const std::exception& e)
221 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
225 template<
typename TDatums,
typename TWorker,
typename TQueue>
231 for (
auto& tQueue : mTQueues)
234 *spIsRunning =
false;
235 for (
auto& thread : mThreads)
236 thread->stopAndJoin();
241 catch (
const std::exception& e)
243 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
247 template<
typename TDatums,
typename TWorker,
typename TQueue>
254 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
255 if (mTQueues.empty())
256 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
257 return mTQueues[0]->tryEmplace(tDatums);
259 catch (
const std::exception& e)
261 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
266 template<
typename TDatums,
typename TWorker,
typename TQueue>
273 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
274 if (mTQueues.empty())
275 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
276 return mTQueues[0]->waitAndEmplace(tDatums);
278 catch (
const std::exception& e)
280 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
285 template<
typename TDatums,
typename TWorker,
typename TQueue>
292 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
293 if (mTQueues.empty())
294 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
295 return mTQueues[0]->tryPush(tDatums);
297 catch (
const std::exception& e)
299 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
304 template<
typename TDatums,
typename TWorker,
typename TQueue>
311 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
312 if (mTQueues.empty())
313 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
314 return mTQueues[0]->waitAndPush(tDatums);
316 catch (
const std::exception& e)
318 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
323 template<
typename TDatums,
typename TWorker,
typename TQueue>
330 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
331 if (mTQueues.empty())
332 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
333 return (*mTQueues.rbegin())->tryPop(tDatums);
335 catch (
const std::exception& e)
337 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
342 template<
typename TDatums,
typename TWorker,
typename TQueue>
349 error(
"Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
350 if (mTQueues.empty())
351 error(
"ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
352 return (*mTQueues.rbegin())->waitAndPop(tDatums);
354 catch (
const std::exception& e)
356 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
361 template<
typename TDatums,
typename TWorker,
typename TQueue>
363 unsigned long long,
unsigned long long>>& threadWorkerQueues)
367 for (
const auto& threadWorkerQueue : threadWorkerQueues)
368 mThreadWorkerQueues.insert(threadWorkerQueue);
370 catch (
const std::exception& e)
372 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
376 template<
typename TDatums,
typename TWorker,
typename TQueue>
378 unsigned long long>>& threadWorkerQueues)
382 for (
const auto& threadWorkerQueue : threadWorkerQueues)
383 add({std::make_tuple(std::get<0>(threadWorkerQueue),
384 std::vector<TWorker>{std::get<1>(threadWorkerQueue)},
385 std::get<2>(threadWorkerQueue),
386 std::get<3>(threadWorkerQueue))});
388 catch (
const std::exception& e)
390 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
394 template<
typename TDatums,
typename TWorker,
typename TQueue>
395 void ThreadManager<TDatums, TWorker, TQueue>::multisetToThreads()
399 if (!mThreadWorkerQueues.empty())
405 checkAndCreateEmptyThreads();
408 checkAndCreateQueues();
411 const auto maxQueueIdSynchronous = mTQueues.size()+1;
414 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
416 auto& thread = mThreads[std::get<0>(threadWorkerQueue)];
417 const auto& tWorkers = std::get<1>(threadWorkerQueue);
418 const auto queueIn = std::get<2>(threadWorkerQueue);
419 const auto queueOut = std::get<3>(threadWorkerQueue);
420 std::shared_ptr<SubThread<TDatums, TWorker>> subThread;
426 && queueOut == mTQueues.size())
427 subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
428 tWorkers, mTQueues.at(queueIn))};
430 subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
431 tWorkers, mTQueues.at(queueIn), mTQueues.at(queueOut))};
434 else if (queueOut != maxQueueIdSynchronous
439 subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
440 tWorkers, mTQueues.at(queueIn-1), mTQueues.at(queueOut-1))};
443 subThread = {std::make_shared<SubThreadQueueOut<TDatums, TWorker, TQueue>>(
444 tWorkers, mTQueues.at(queueOut-1))};
447 else if (queueIn != 0)
448 subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
449 tWorkers, mTQueues.at(queueIn-1))};
452 subThread = {std::make_shared<SubThreadNoQueue<TDatums, TWorker>>(tWorkers)};
453 thread->add(subThread);
457 error(
"Empty, no TWorker(s) added.", __LINE__);
459 catch (
const std::exception& e)
461 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
465 template<
typename TDatums,
typename TWorker,
typename TQueue>
466 void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateEmptyThreads()
471 const auto maxThreadId = std::get<0>(*mThreadWorkerQueues.crbegin());
472 auto previousThreadId = std::get<0>(*mThreadWorkerQueues.cbegin());
473 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
475 const auto currentThreadId = std::get<0>(threadWorkerQueue);
476 if (currentThreadId - previousThreadId > 1)
477 error(
"Missing thread id " + std::to_string(currentThreadId) +
" of "
478 + std::to_string(maxThreadId) +
".", __LINE__, __FUNCTION__, __FILE__);
479 previousThreadId = currentThreadId;
484 mThreads.resize(maxThreadId);
485 for (
auto& thread : mThreads)
486 thread = std::make_shared<Thread<TDatums, TWorker>>();
487 mThreads.emplace_back(std::make_shared<Thread<TDatums, TWorker>>(spIsRunning));
489 catch (
const std::exception& e)
491 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
495 template<
typename TDatums,
typename TWorker,
typename TQueue>
496 void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateQueues()
500 if (!mThreadWorkerQueues.empty())
503 auto maxQueueId = std::get<3>(*mThreadWorkerQueues.cbegin());
504 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
506 maxQueueId,
fastMax(std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue)));
510 std::vector<std::pair<bool, bool>> usedQueueIds(maxQueueId+1, {
false,
false});
511 for (
const auto& threadWorkerQueue : mThreadWorkerQueues)
513 usedQueueIds.at(std::get<2>(threadWorkerQueue)).first =
true;
514 usedQueueIds.at(std::get<3>(threadWorkerQueue)).second =
true;
517 usedQueueIds.begin()->second =
true;
519 usedQueueIds.rbegin()->first =
true;
521 for (
auto i = 0ull ; i < usedQueueIds.size() ; i++)
523 if (!usedQueueIds[i].first)
524 error(
"Missing queue id " + std::to_string(i) +
" (of "
525 + std::to_string(maxQueueId) +
") as input.", __LINE__, __FUNCTION__, __FILE__);
526 if (!usedQueueIds[i].second)
527 error(
"Missing queue id " + std::to_string(i) +
" (of "
528 + std::to_string(maxQueueId) +
") as output.", __LINE__, __FUNCTION__, __FILE__);
533 mTQueues.resize(maxQueueId+1);
535 mTQueues.resize(maxQueueId-1);
538 mTQueues.resize(maxQueueId);
540 error(
"Unknown ThreadManagerMode", __LINE__, __FUNCTION__, __FILE__);
541 for (
auto& tQueue : mTQueues)
542 tQueue = std::make_shared<TQueue>(mDefaultMaxSizeQueues);
545 catch (
const std::exception& e)
547 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
bool waitAndEmplace(TDatums &tDatums)
bool waitAndPop(TDatums &tDatums)
std::shared_ptr< std::atomic< bool > > getIsRunningSharedPtr()
bool tryPop(TDatums &tDatums)
bool tryPush(const TDatums &tDatums)
void add(const unsigned long long threadId, const std::vector< TWorker > &tWorkers, const unsigned long long queueInId, const unsigned long long queueOutId)
ThreadManager(const ThreadManagerMode threadManagerMode=ThreadManagerMode::Synchronous)
bool tryEmplace(TDatums &tDatums)
bool waitAndPush(const TDatums &tDatums)
void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues=-1)
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
OP_API void setMainThread()
OP_API void checkWorkerErrors()
T fastMax(const T a, const T b)
OP_API void opLog(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")