OpenPose  1.7.0
The first real-time multi-person system to jointly detect human body, hand, facial, and foot keypoints
queueBase.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_QUEUE_BASE_HPP
2 #define OPENPOSE_THREAD_QUEUE_BASE_HPP
3 
4 #include <condition_variable>
5 #include <mutex>
6 #include <queue> // std::queue & std::priority_queue
8 
9 namespace op
10 {
11  template<typename TDatums, typename TQueue>
12  class QueueBase
13  {
14  public:
15  explicit QueueBase(const long long maxSize = -1);
16 
17  virtual ~QueueBase();
18 
19  bool forceEmplace(TDatums& tDatums);
20 
21  bool tryEmplace(TDatums& tDatums);
22 
23  bool waitAndEmplace(TDatums& tDatums);
24 
25  bool forcePush(const TDatums& tDatums);
26 
27  bool tryPush(const TDatums& tDatums);
28 
29  bool waitAndPush(const TDatums& tDatums);
30 
31  bool tryPop(TDatums& tDatums);
32 
33  bool tryPop();
34 
35  bool waitAndPop(TDatums& tDatums);
36 
37  bool waitAndPop();
38 
39  bool empty() const;
40 
41  void stop();
42 
43  void stopPusher();
44 
45  void addPopper();
46 
47  void addPusher();
48 
49  bool isRunning() const;
50 
51  bool isFull() const;
52 
53  size_t size() const;
54 
55  void clear();
56 
57  virtual TDatums front() const = 0;
58 
59  protected:
60  mutable std::mutex mMutex;
61  long long mPoppers;
62  long long mPushers;
63  long long mMaxPoppersPushers;
66  std::condition_variable mConditionVariable;
67  TQueue mTQueue;
68 
69  virtual bool pop(TDatums& tDatums) = 0;
70 
71  unsigned long long getMaxSize() const;
72 
73  private:
74  const long long mMaxSize;
75 
76  bool emplace(TDatums& tDatums);
77 
78  bool push(const TDatums& tDatums);
79 
80  bool pop();
81 
82  void updateMaxPoppersPushers();
83 
84  DELETE_COPY(QueueBase);
85  };
86 }
87 
88 
89 
90 
91 
92 // Implementation
93 #include <openpose/core/datum.hpp>
95 namespace op
96 {
97  template<typename TDatums, typename TQueue>
98  QueueBase<TDatums, TQueue>::QueueBase(const long long maxSize) :
99  mPoppers{0ll},
100  mPushers{0ll},
101  mPopIsStopped{false},
102  mPushIsStopped{false},
103  mMaxSize{maxSize}
104  {
105  }
106 
107  // Virtual destructor
108  template<typename TDatums, typename TQueue>
110  {
111  try
112  {
113  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
114  stop();
115  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
116  }
117  catch (const std::exception& e)
118  {
119  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
120  }
121  }
122 
123  template<typename TDatums, typename TQueue>
125  {
126  try
127  {
128  const std::lock_guard<std::mutex> lock{mMutex};
129  if (mTQueue.size() >= getMaxSize())
130  mTQueue.pop();
131  return emplace(tDatums);
132  }
133  catch (const std::exception& e)
134  {
135  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
136  return false;
137  }
138  }
139 
140  template<typename TDatums, typename TQueue>
142  {
143  try
144  {
145  const std::lock_guard<std::mutex> lock{mMutex};
146  if (mTQueue.size() >= getMaxSize())
147  return false;
148  return emplace(tDatums);
149  }
150  catch (const std::exception& e)
151  {
152  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
153  return false;
154  }
155  }
156 
157  template<typename TDatums, typename TQueue>
159  {
160  try
161  {
162  std::unique_lock<std::mutex> lock{mMutex};
163  mConditionVariable.wait(lock, [this]{return mTQueue.size() < getMaxSize() || mPushIsStopped; });
164  return emplace(tDatums);
165  }
166  catch (const std::exception& e)
167  {
168  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
169  return false;
170  }
171  }
172 
173  template<typename TDatums, typename TQueue>
174  bool QueueBase<TDatums, TQueue>::forcePush(const TDatums& tDatums)
175  {
176  try
177  {
178  const std::lock_guard<std::mutex> lock{mMutex};
179  if (mTQueue.size() >= getMaxSize())
180  mTQueue.pop();
181  return push(tDatums);
182  }
183  catch (const std::exception& e)
184  {
185  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
186  return false;
187  }
188  }
189 
190  template<typename TDatums, typename TQueue>
191  bool QueueBase<TDatums, TQueue>::tryPush(const TDatums& tDatums)
192  {
193  try
194  {
195  const std::lock_guard<std::mutex> lock{mMutex};
196  if (mTQueue.size() >= getMaxSize())
197  return false;
198  return push(tDatums);
199  }
200  catch (const std::exception& e)
201  {
202  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
203  return false;
204  }
205  }
206 
207  template<typename TDatums, typename TQueue>
208  bool QueueBase<TDatums, TQueue>::waitAndPush(const TDatums& tDatums)
209  {
210  try
211  {
212  std::unique_lock<std::mutex> lock{mMutex};
213  mConditionVariable.wait(lock, [this]{return mTQueue.size() < getMaxSize() || mPushIsStopped; });
214  return push(tDatums);
215  }
216  catch (const std::exception& e)
217  {
218  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
219  return false;
220  }
221  }
222 
223  template<typename TDatums, typename TQueue>
224  bool QueueBase<TDatums, TQueue>::tryPop(TDatums& tDatums)
225  {
226  try
227  {
228  const std::lock_guard<std::mutex> lock{mMutex};
229  return pop(tDatums);
230  }
231  catch (const std::exception& e)
232  {
233  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
234  return false;
235  }
236  }
237 
238  template<typename TDatums, typename TQueue>
240  {
241  try
242  {
243  const std::lock_guard<std::mutex> lock{mMutex};
244  return pop();
245  }
246  catch (const std::exception& e)
247  {
248  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
249  return false;
250  }
251  }
252 
253  template<typename TDatums, typename TQueue>
255  {
256  try
257  {
258  std::unique_lock<std::mutex> lock{mMutex};
259  mConditionVariable.wait(lock, [this]{return !mTQueue.empty() || mPopIsStopped; });
260  return pop(tDatums);
261  }
262  catch (const std::exception& e)
263  {
264  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
265  return false;
266  }
267  }
268 
269  template<typename TDatums, typename TQueue>
271  {
272  try
273  {
274  std::unique_lock<std::mutex> lock{mMutex};
275  mConditionVariable.wait(lock, [this]{return !mTQueue.empty() || mPopIsStopped; });
276  return pop();
277  }
278  catch (const std::exception& e)
279  {
280  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
281  return false;
282  }
283  }
284 
285  template<typename TDatums, typename TQueue>
287  {
288  try
289  {
290  const std::lock_guard<std::mutex> lock{mMutex};
291  return mTQueue.empty();
292  }
293  catch (const std::exception& e)
294  {
295  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
296  return false;
297  }
298  }
299 
300  template<typename TDatums, typename TQueue>
302  {
303  try
304  {
305  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
306  const std::lock_guard<std::mutex> lock{mMutex};
307  mPopIsStopped = {true};
308  mPushIsStopped = {true};
309  while (!mTQueue.empty())
310  mTQueue.pop();
311  mConditionVariable.notify_all();
312  }
313  catch (const std::exception& e)
314  {
315  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
316  }
317  }
318 
319  template<typename TDatums, typename TQueue>
321  {
322  try
323  {
324  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
325  const std::lock_guard<std::mutex> lock{mMutex};
326  mPushers--;
327  if (mPushers == 0)
328  {
329  mPushIsStopped = {true};
330  if (mTQueue.empty())
331  mPopIsStopped = {true};
332  mConditionVariable.notify_all();
333  }
334  }
335  catch (const std::exception& e)
336  {
337  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
338  }
339  }
340 
341  template<typename TDatums, typename TQueue>
343  {
344  try
345  {
346  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
347  const std::lock_guard<std::mutex> lock{mMutex};
348  mPoppers++;
349  updateMaxPoppersPushers();
350  }
351  catch (const std::exception& e)
352  {
353  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
354  }
355  }
356 
357  template<typename TDatums, typename TQueue>
359  {
360  try
361  {
362  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
363  const std::lock_guard<std::mutex> lock{mMutex};
364  mPushers++;
365  updateMaxPoppersPushers();
366  }
367  catch (const std::exception& e)
368  {
369  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
370  }
371  }
372 
373  template<typename TDatums, typename TQueue>
375  {
376  try
377  {
378  const std::lock_guard<std::mutex> lock{mMutex};
379  return !(mPushIsStopped && (mPopIsStopped || mTQueue.empty()));
380  }
381  catch (const std::exception& e)
382  {
383  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
384  return true;
385  }
386  }
387 
388  template<typename TDatums, typename TQueue>
390  {
391  try
392  {
393  // No mutex required because the size() and getMaxSize() are already thread-safe
394  return size() == getMaxSize();
395  }
396  catch (const std::exception& e)
397  {
398  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
399  return false;
400  }
401  }
402 
403  template<typename TDatums, typename TQueue>
405  {
406  try
407  {
408  const std::lock_guard<std::mutex> lock{mMutex};
409  return mTQueue.size();
410  }
411  catch (const std::exception& e)
412  {
413  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
414  return 0;
415  }
416  }
417 
418  template<typename TDatums, typename TQueue>
420  {
421  try
422  {
423  const std::lock_guard<std::mutex> lock{mMutex};
424  while (!mTQueue.empty())
425  mTQueue.pop();
426  }
427  catch (const std::exception& e)
428  {
429  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
430  }
431  }
432 
433  template<typename TDatums, typename TQueue>
434  unsigned long long QueueBase<TDatums, TQueue>::getMaxSize() const
435  {
436  try
437  {
438  return (mMaxSize > 0 ? mMaxSize : fastMax(1ll, mMaxPoppersPushers));
439  }
440  catch (const std::exception& e)
441  {
442  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
443  return false;
444  }
445  }
446 
447  template<typename TDatums, typename TQueue>
448  bool QueueBase<TDatums, TQueue>::emplace(TDatums& tDatums)
449  {
450  try
451  {
452  if (mPushIsStopped)
453  return false;
454 
455  mTQueue.emplace(tDatums);
456  mConditionVariable.notify_all();
457  return true;
458  }
459  catch (const std::exception& e)
460  {
461  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
462  return false;
463  }
464  }
465 
466  template<typename TDatums, typename TQueue>
467  bool QueueBase<TDatums, TQueue>::push(const TDatums& tDatums)
468  {
469  try
470  {
471  if (mPushIsStopped)
472  return false;
473 
474  mTQueue.push(tDatums);
475  mConditionVariable.notify_all();
476  return true;
477  }
478  catch (const std::exception& e)
479  {
480  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
481  return false;
482  }
483  }
484 
485  template<typename TDatums, typename TQueue>
486  bool QueueBase<TDatums, TQueue>::pop()
487  {
488  try
489  {
490  if (mPopIsStopped || mTQueue.empty())
491  return false;
492 
493  mTQueue.pop();
494  mConditionVariable.notify_all();
495  return true;
496  }
497  catch (const std::exception& e)
498  {
499  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
500  return false;
501  }
502  }
503 
504  template<typename TDatums, typename TQueue>
505  void QueueBase<TDatums, TQueue>::updateMaxPoppersPushers()
506  {
507  try
508  {
509  mMaxPoppersPushers = fastMax(mPoppers, mPushers);
510  }
511  catch (const std::exception& e)
512  {
513  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
514  }
515  }
516 
517  extern template class QueueBase<BASE_DATUMS_SH, std::queue<BASE_DATUMS_SH>>;
518  extern template class QueueBase<
520  std::priority_queue<BASE_DATUMS_SH, std::vector<BASE_DATUMS_SH>,
521  std::greater<BASE_DATUMS_SH>>>;
522 }
523 
524 #endif // OPENPOSE_THREAD_QUEUE_BASE_HPP
long long mPoppers
Definition: queueBase.hpp:61
bool isFull() const
Definition: queueBase.hpp:389
void addPusher()
Definition: queueBase.hpp:358
long long mMaxPoppersPushers
Definition: queueBase.hpp:63
std::mutex mMutex
Definition: queueBase.hpp:60
bool waitAndPop(TDatums &tDatums)
Definition: queueBase.hpp:254
void stopPusher()
Definition: queueBase.hpp:320
bool tryPush(const TDatums &tDatums)
Definition: queueBase.hpp:191
TQueue mTQueue
Definition: queueBase.hpp:67
virtual bool pop(TDatums &tDatums)=0
bool empty() const
Definition: queueBase.hpp:286
bool mPopIsStopped
Definition: queueBase.hpp:64
bool tryEmplace(TDatums &tDatums)
Definition: queueBase.hpp:141
unsigned long long getMaxSize() const
Definition: queueBase.hpp:434
long long mPushers
Definition: queueBase.hpp:62
bool tryPop(TDatums &tDatums)
Definition: queueBase.hpp:224
bool waitAndPop()
Definition: queueBase.hpp:270
bool waitAndEmplace(TDatums &tDatums)
Definition: queueBase.hpp:158
std::condition_variable mConditionVariable
Definition: queueBase.hpp:66
bool forceEmplace(TDatums &tDatums)
Definition: queueBase.hpp:124
size_t size() const
Definition: queueBase.hpp:404
bool isRunning() const
Definition: queueBase.hpp:374
virtual TDatums front() const =0
bool waitAndPush(const TDatums &tDatums)
Definition: queueBase.hpp:208
bool forcePush(const TDatums &tDatums)
Definition: queueBase.hpp:174
void addPopper()
Definition: queueBase.hpp:342
QueueBase(const long long maxSize=-1)
Definition: queueBase.hpp:98
virtual ~QueueBase()
Definition: queueBase.hpp:109
bool mPushIsStopped
Definition: queueBase.hpp:65
#define BASE_DATUMS_SH
Definition: datum.hpp:405
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
T fastMax(const T a, const T b)
Definition: fastMath.hpp:73
OP_API void opLog(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")