OpenPose  1.7.0
The first real-time multi-person system to jointly detect human body, hand, facial, and foot keypoints
threadManager.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_THREAD_MANAGER_HPP
2 #define OPENPOSE_THREAD_THREAD_MANAGER_HPP
3 
4 #include <atomic>
5 #include <set> // std::multiset
6 #include <tuple>
12 
13 namespace op
14 {
15  template<typename TDatums, typename TWorker = std::shared_ptr<Worker<TDatums>>, typename TQueue = Queue<TDatums>>
17  {
18  public:
19  // Completely customizable case
20  explicit ThreadManager(const ThreadManagerMode threadManagerMode = ThreadManagerMode::Synchronous);
21 
22  virtual ~ThreadManager();
23 
33  void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues = -1);
34 
35  void add(const unsigned long long threadId, const std::vector<TWorker>& tWorkers,
36  const unsigned long long queueInId, const unsigned long long queueOutId);
37 
38  void add(const unsigned long long threadId, const TWorker& tWorker, const unsigned long long queueInId,
39  const unsigned long long queueOutId);
40 
41  void reset();
42 
43  void exec();
44 
45  void start();
46 
47  void stop();
48 
49  inline std::shared_ptr<std::atomic<bool>> getIsRunningSharedPtr()
50  {
51  return spIsRunning;
52  }
53 
54  inline bool isRunning() const
55  {
56  return *spIsRunning;
57  }
58 
59  bool tryEmplace(TDatums& tDatums);
60 
61  bool waitAndEmplace(TDatums& tDatums);
62 
63  bool tryPush(const TDatums& tDatums);
64 
65  bool waitAndPush(const TDatums& tDatums);
66 
67  bool tryPop(TDatums& tDatums);
68 
69  bool waitAndPop(TDatums& tDatums);
70 
71  private:
72  const ThreadManagerMode mThreadManagerMode;
73  std::shared_ptr<std::atomic<bool>> spIsRunning;
74  long long mDefaultMaxSizeQueues;
75  std::multiset<std::tuple<unsigned long long, std::vector<TWorker>, unsigned long long, unsigned long long>> mThreadWorkerQueues;
76  std::vector<std::shared_ptr<Thread<TDatums, TWorker>>> mThreads;
77  std::vector<std::shared_ptr<TQueue>> mTQueues;
78 
79  void add(const std::vector<std::tuple<unsigned long long, std::vector<TWorker>, unsigned long long, unsigned long long>>& threadWorkerQueues);
80 
81  void add(const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long, unsigned long long>>& threadWorkerQueues);
82 
83  void multisetToThreads();
84 
85  void checkAndCreateEmptyThreads();
86 
87  void checkAndCreateQueues();
88 
89  DELETE_COPY(ThreadManager);
90  };
91 }
92 
93 
94 
95 
96 
97 // Implementation
98 #include <utility> // std::pair
105 namespace op
106 {
107  template<typename TDatums, typename TWorker, typename TQueue>
109  mThreadManagerMode{threadManagerMode},
110  spIsRunning{std::make_shared<std::atomic<bool>>(false)},
111  mDefaultMaxSizeQueues{-1ll}
112  {
113  }
114 
115  template<typename TDatums, typename TWorker, typename TQueue>
117  {
118  }
119 
120  template<typename TDatums, typename TWorker, typename TQueue>
121  void ThreadManager<TDatums, TWorker, TQueue>::setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues)
122  {
123  try
124  {
125  mDefaultMaxSizeQueues = {defaultMaxSizeQueues};
126  }
127  catch (const std::exception& e)
128  {
129  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
130  }
131  }
132 
133  template<typename TDatums, typename TWorker, typename TQueue>
134  void ThreadManager<TDatums, TWorker, TQueue>::add(const unsigned long long threadId,
135  const std::vector<TWorker>& tWorkers,
136  const unsigned long long queueInId,
137  const unsigned long long queueOutId)
138  {
139  try
140  {
141  add({std::make_tuple(threadId, tWorkers, queueInId, queueOutId)});
142  }
143  catch (const std::exception& e)
144  {
145  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
146  }
147  }
148 
149  template<typename TDatums, typename TWorker, typename TQueue>
150  void ThreadManager<TDatums, TWorker, TQueue>::add(const unsigned long long threadId,
151  const TWorker& tWorker,
152  const unsigned long long queueInId,
153  const unsigned long long queueOutId)
154  {
155  try
156  {
157  add({std::make_tuple(threadId, std::vector<TWorker>{tWorker}, queueInId, queueOutId)});
158  }
159  catch (const std::exception& e)
160  {
161  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
162  }
163  }
164 
165  template<typename TDatums, typename TWorker, typename TQueue>
167  {
168  try
169  {
170  mThreadWorkerQueues.clear();
171  mThreads.clear();
172  mTQueues.clear();
173  }
174  catch (const std::exception& e)
175  {
176  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
177  }
178  }
179 
180  template<typename TDatums, typename TWorker, typename TQueue>
182  {
183  try
184  {
185  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
186  // Set threads
187  multisetToThreads();
188  if (!mThreads.empty())
189  {
190  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
191  // Start threads
192  for (auto i = 0u; i < mThreads.size() - 1; i++)
193  mThreads.at(i)->startInThread();
194  (*mThreads.rbegin())->exec(spIsRunning);
195  // Stop threads - It will arrive here when the exec() command has finished
196  stop();
197  }
198  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
199  }
200  catch (const std::exception& e)
201  {
202  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
203  }
204  }
205 
206  template<typename TDatums, typename TWorker, typename TQueue>
208  {
209  try
210  {
211  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
212  // Set threads
213  multisetToThreads();
214  // Start threads
215  for (auto& thread : mThreads)
216  thread->startInThread();
217  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
218  }
219  catch (const std::exception& e)
220  {
221  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
222  }
223  }
224 
225  template<typename TDatums, typename TWorker, typename TQueue>
227  {
228  try
229  {
230  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
231  for (auto& tQueue : mTQueues)
232  tQueue->stop();
233  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
234  *spIsRunning = false;
235  for (auto& thread : mThreads)
236  thread->stopAndJoin();
237  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
239  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
240  }
241  catch (const std::exception& e)
242  {
243  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
244  }
245  }
246 
247  template<typename TDatums, typename TWorker, typename TQueue>
249  {
250  try
251  {
252  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
253  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
254  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
255  if (mTQueues.empty())
256  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
257  return mTQueues[0]->tryEmplace(tDatums);
258  }
259  catch (const std::exception& e)
260  {
261  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
262  return false;
263  }
264  }
265 
266  template<typename TDatums, typename TWorker, typename TQueue>
268  {
269  try
270  {
271  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
272  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
273  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
274  if (mTQueues.empty())
275  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
276  return mTQueues[0]->waitAndEmplace(tDatums);
277  }
278  catch (const std::exception& e)
279  {
280  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
281  return false;
282  }
283  }
284 
285  template<typename TDatums, typename TWorker, typename TQueue>
287  {
288  try
289  {
290  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
291  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
292  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
293  if (mTQueues.empty())
294  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
295  return mTQueues[0]->tryPush(tDatums);
296  }
297  catch (const std::exception& e)
298  {
299  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
300  return false;
301  }
302  }
303 
304  template<typename TDatums, typename TWorker, typename TQueue>
306  {
307  try
308  {
309  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
310  && mThreadManagerMode != ThreadManagerMode::AsynchronousIn)
311  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
312  if (mTQueues.empty())
313  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
314  return mTQueues[0]->waitAndPush(tDatums);
315  }
316  catch (const std::exception& e)
317  {
318  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
319  return false;
320  }
321  }
322 
323  template<typename TDatums, typename TWorker, typename TQueue>
325  {
326  try
327  {
328  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
329  && mThreadManagerMode != ThreadManagerMode::AsynchronousOut)
330  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
331  if (mTQueues.empty())
332  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
333  return (*mTQueues.rbegin())->tryPop(tDatums);
334  }
335  catch (const std::exception& e)
336  {
337  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
338  return false;
339  }
340  }
341 
342  template<typename TDatums, typename TWorker, typename TQueue>
344  {
345  try
346  {
347  if (mThreadManagerMode != ThreadManagerMode::Asynchronous
348  && mThreadManagerMode != ThreadManagerMode::AsynchronousOut)
349  error("Not available for this ThreadManagerMode.", __LINE__, __FUNCTION__, __FILE__);
350  if (mTQueues.empty())
351  error("ThreadManager already stopped or not started yet.", __LINE__, __FUNCTION__, __FILE__);
352  return (*mTQueues.rbegin())->waitAndPop(tDatums);
353  }
354  catch (const std::exception& e)
355  {
356  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
357  return false;
358  }
359  }
360 
361  template<typename TDatums, typename TWorker, typename TQueue>
362  void ThreadManager<TDatums, TWorker, TQueue>::add(const std::vector<std::tuple<unsigned long long, std::vector<TWorker>,
363  unsigned long long, unsigned long long>>& threadWorkerQueues)
364  {
365  try
366  {
367  for (const auto& threadWorkerQueue : threadWorkerQueues)
368  mThreadWorkerQueues.insert(threadWorkerQueue);
369  }
370  catch (const std::exception& e)
371  {
372  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
373  }
374  }
375 
376  template<typename TDatums, typename TWorker, typename TQueue>
377  void ThreadManager<TDatums, TWorker, TQueue>::add(const std::vector<std::tuple<unsigned long long, TWorker, unsigned long long,
378  unsigned long long>>& threadWorkerQueues)
379  {
380  try
381  {
382  for (const auto& threadWorkerQueue : threadWorkerQueues)
383  add({std::make_tuple(std::get<0>(threadWorkerQueue),
384  std::vector<TWorker>{std::get<1>(threadWorkerQueue)},
385  std::get<2>(threadWorkerQueue),
386  std::get<3>(threadWorkerQueue))});
387  }
388  catch (const std::exception& e)
389  {
390  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
391  }
392  }
393 
394  template<typename TDatums, typename TWorker, typename TQueue>
395  void ThreadManager<TDatums, TWorker, TQueue>::multisetToThreads()
396  {
397  try
398  {
399  if (!mThreadWorkerQueues.empty())
400  {
401  // This avoids extra std::cout if errors occur on different threads
402  setMainThread();
403 
404  // Check threads
405  checkAndCreateEmptyThreads();
406 
407  // Check and create queues
408  checkAndCreateQueues();
409 
410  // Data
411  const auto maxQueueIdSynchronous = mTQueues.size()+1;
412 
413  // Set up threads
414  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
415  {
416  auto& thread = mThreads[std::get<0>(threadWorkerQueue)];
417  const auto& tWorkers = std::get<1>(threadWorkerQueue);
418  const auto queueIn = std::get<2>(threadWorkerQueue);
419  const auto queueOut = std::get<3>(threadWorkerQueue);
420  std::shared_ptr<SubThread<TDatums, TWorker>> subThread;
421  // If AsynchronousIn -> queue indexes are OK
422  if (mThreadManagerMode == ThreadManagerMode::Asynchronous
423  || mThreadManagerMode == ThreadManagerMode::AsynchronousIn)
424  {
425  if (mThreadManagerMode == ThreadManagerMode::AsynchronousIn
426  && queueOut == mTQueues.size())
427  subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
428  tWorkers, mTQueues.at(queueIn))};
429  else
430  subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
431  tWorkers, mTQueues.at(queueIn), mTQueues.at(queueOut))};
432  }
433  // If !AsynchronousIn -> queue indexes - 1
434  else if (queueOut != maxQueueIdSynchronous
435  || mThreadManagerMode == ThreadManagerMode::AsynchronousOut)
436  {
437  // Queue in + out
438  if (queueIn != 0)
439  subThread = {std::make_shared<SubThreadQueueInOut<TDatums, TWorker, TQueue>>(
440  tWorkers, mTQueues.at(queueIn-1), mTQueues.at(queueOut-1))};
441  // Case queue out (first TWorker(s))
442  else
443  subThread = {std::make_shared<SubThreadQueueOut<TDatums, TWorker, TQueue>>(
444  tWorkers, mTQueues.at(queueOut-1))};
445  }
446  // Case queue in (last TWorker(s))
447  else if (queueIn != 0) // && queueOut == maxQueueIdSynchronous
448  subThread = {std::make_shared<SubThreadQueueIn<TDatums, TWorker, TQueue>>(
449  tWorkers, mTQueues.at(queueIn-1))};
450  // Case no queue
451  else // if (queueIn == 0 && queueOut == maxQueueIdSynchronous)
452  subThread = {std::make_shared<SubThreadNoQueue<TDatums, TWorker>>(tWorkers)};
453  thread->add(subThread);
454  }
455  }
456  else
457  error("Empty, no TWorker(s) added.", __LINE__);
458  }
459  catch (const std::exception& e)
460  {
461  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
462  }
463  }
464 
465  template<typename TDatums, typename TWorker, typename TQueue>
466  void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateEmptyThreads()
467  {
468  try
469  {
470  // Check all thread ids from 0-maxThreadId are present
471  const auto maxThreadId = std::get<0>(*mThreadWorkerQueues.crbegin());
472  auto previousThreadId = std::get<0>(*mThreadWorkerQueues.cbegin());
473  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
474  {
475  const auto currentThreadId = std::get<0>(threadWorkerQueue);
476  if (currentThreadId - previousThreadId > 1)
477  error("Missing thread id " + std::to_string(currentThreadId) + " of "
478  + std::to_string(maxThreadId) + ".", __LINE__, __FUNCTION__, __FILE__);
479  previousThreadId = currentThreadId;
480  }
481 
482  // Create Threads
483  // #threads = maxThreadId+1
484  mThreads.resize(maxThreadId);
485  for (auto& thread : mThreads)
486  thread = std::make_shared<Thread<TDatums, TWorker>>();
487  mThreads.emplace_back(std::make_shared<Thread<TDatums, TWorker>>(spIsRunning));
488  }
489  catch (const std::exception& e)
490  {
491  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
492  }
493  }
494 
495  template<typename TDatums, typename TWorker, typename TQueue>
496  void ThreadManager<TDatums, TWorker, TQueue>::checkAndCreateQueues()
497  {
498  try
499  {
500  if (!mThreadWorkerQueues.empty())
501  {
502  // Get max queue id to get queue size
503  auto maxQueueId = std::get<3>(*mThreadWorkerQueues.cbegin());
504  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
505  maxQueueId = fastMax(
506  maxQueueId, fastMax(std::get<2>(threadWorkerQueue), std::get<3>(threadWorkerQueue)));
507 
508  // Check each queue id has at least a worker that uses it as input and another one as output.
509  // Special cases:
510  std::vector<std::pair<bool, bool>> usedQueueIds(maxQueueId+1, {false, false});
511  for (const auto& threadWorkerQueue : mThreadWorkerQueues)
512  {
513  usedQueueIds.at(std::get<2>(threadWorkerQueue)).first = true;
514  usedQueueIds.at(std::get<3>(threadWorkerQueue)).second = true;
515  }
516  // Id 0 must only needs a worker using it as input.
517  usedQueueIds.begin()->second = true;
518  // Id maxQueueId only needs a worker using it as output.
519  usedQueueIds.rbegin()->first = true;
520  // Error if missing queue id
521  for (auto i = 0ull ; i < usedQueueIds.size() ; i++)
522  {
523  if (!usedQueueIds[i].first)
524  error("Missing queue id " + std::to_string(i) + " (of "
525  + std::to_string(maxQueueId) + ") as input.", __LINE__, __FUNCTION__, __FILE__);
526  if (!usedQueueIds[i].second)
527  error("Missing queue id " + std::to_string(i) + " (of "
528  + std::to_string(maxQueueId) + ") as output.", __LINE__, __FUNCTION__, __FILE__);
529  }
530 
531  // Create Queues
532  if (mThreadManagerMode == ThreadManagerMode::Asynchronous)
533  mTQueues.resize(maxQueueId+1); // First and last one are queues
534  else if (mThreadManagerMode == ThreadManagerMode::Synchronous)
535  mTQueues.resize(maxQueueId-1); // First and last one are not actually queues
536  else if (mThreadManagerMode == ThreadManagerMode::AsynchronousIn
537  || mThreadManagerMode == ThreadManagerMode::AsynchronousOut)
538  mTQueues.resize(maxQueueId); // First or last one is queue
539  else
540  error("Unknown ThreadManagerMode", __LINE__, __FUNCTION__, __FILE__);
541  for (auto& tQueue : mTQueues)
542  tQueue = std::make_shared<TQueue>(mDefaultMaxSizeQueues);
543  }
544  }
545  catch (const std::exception& e)
546  {
547  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
548  }
549  }
550 
552 }
553 
554 #endif // OPENPOSE_THREAD_THREAD_MANAGER_HPP
virtual ~ThreadManager()
bool waitAndEmplace(TDatums &tDatums)
bool waitAndPop(TDatums &tDatums)
std::shared_ptr< std::atomic< bool > > getIsRunningSharedPtr()
bool tryPop(TDatums &tDatums)
bool tryPush(const TDatums &tDatums)
bool isRunning() const
void add(const unsigned long long threadId, const std::vector< TWorker > &tWorkers, const unsigned long long queueInId, const unsigned long long queueOutId)
ThreadManager(const ThreadManagerMode threadManagerMode=ThreadManagerMode::Synchronous)
bool tryEmplace(TDatums &tDatums)
bool waitAndPush(const TDatums &tDatums)
void setDefaultMaxSizeQueues(const long long defaultMaxSizeQueues=-1)
ThreadManagerMode
Definition: enumClasses.hpp:10
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
OP_API void setMainThread()
OP_API void checkWorkerErrors()
T fastMax(const T a, const T b)
Definition: fastMath.hpp:73
OP_API void opLog(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")