OpenPose  1.7.0
The first real-time multi-person system to jointly detect human body, hand, facial, and foot keypoints
All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Macros Pages
queueBase.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_QUEUE_BASE_HPP
2 #define OPENPOSE_THREAD_QUEUE_BASE_HPP
3 
4 #include <condition_variable>
5 #include <mutex>
6 #include <queue> // std::queue & std::priority_queue
8 
9 namespace op
10 {
11  template<typename TDatums, typename TQueue>
12  class QueueBase
13  {
14  public:
15  explicit QueueBase(const long long maxSize = -1);
16 
17  virtual ~QueueBase();
18 
19  bool forceEmplace(TDatums& tDatums);
20 
21  bool tryEmplace(TDatums& tDatums);
22 
23  bool waitAndEmplace(TDatums& tDatums);
24 
25  bool forcePush(const TDatums& tDatums);
26 
27  bool tryPush(const TDatums& tDatums);
28 
29  bool waitAndPush(const TDatums& tDatums);
30 
31  bool tryPop(TDatums& tDatums);
32 
33  bool tryPop();
34 
35  bool waitAndPop(TDatums& tDatums);
36 
37  bool waitAndPop();
38 
39  bool empty() const;
40 
41  void stop();
42 
43  void stopPusher();
44 
45  void addPopper();
46 
47  void addPusher();
48 
49  bool isRunning() const;
50 
51  bool isFull() const;
52 
53  size_t size() const;
54 
55  void clear();
56 
57  virtual TDatums front() const = 0;
58 
59  protected:
60  mutable std::mutex mMutex;
61  long long mPoppers;
62  long long mPushers;
63  long long mMaxPoppersPushers;
66  std::condition_variable mConditionVariable;
67  TQueue mTQueue;
68 
69  virtual bool pop(TDatums& tDatums) = 0;
70 
71  unsigned long long getMaxSize() const;
72 
73  private:
74  const long long mMaxSize;
75 
76  bool emplace(TDatums& tDatums);
77 
78  bool push(const TDatums& tDatums);
79 
80  bool pop();
81 
82  void updateMaxPoppersPushers();
83 
84  DELETE_COPY(QueueBase);
85  };
86 }
87 
88 
89 
90 
91 
92 // Implementation
93 #include <openpose/core/datum.hpp>
95 namespace op
96 {
97  template<typename TDatums, typename TQueue>
98  QueueBase<TDatums, TQueue>::QueueBase(const long long maxSize) :
99  mPoppers{0ll},
100  mPushers{0ll},
101  mPopIsStopped{false},
102  mPushIsStopped{false},
103  mMaxSize{maxSize}
104  {
105  }
106 
107  // Virtual destructor
108  template<typename TDatums, typename TQueue>
110  {
111  try
112  {
113  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
114  stop();
115  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
116  }
117  catch (const std::exception& e)
118  {
119  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
120  }
121  }
122 
123  template<typename TDatums, typename TQueue>
125  {
126  try
127  {
128  const std::lock_guard<std::mutex> lock{mMutex};
129  if (mTQueue.size() >= getMaxSize())
130  mTQueue.pop();
131  return emplace(tDatums);
132  }
133  catch (const std::exception& e)
134  {
135  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
136  return false;
137  }
138  }
139 
140  template<typename TDatums, typename TQueue>
142  {
143  try
144  {
145  const std::lock_guard<std::mutex> lock{mMutex};
146  if (mTQueue.size() >= getMaxSize())
147  return false;
148  return emplace(tDatums);
149  }
150  catch (const std::exception& e)
151  {
152  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
153  return false;
154  }
155  }
156 
157  template<typename TDatums, typename TQueue>
159  {
160  try
161  {
162  std::unique_lock<std::mutex> lock{mMutex};
163  mConditionVariable.wait(lock, [this]{return mTQueue.size() < getMaxSize() || mPushIsStopped; });
164  return emplace(tDatums);
165  }
166  catch (const std::exception& e)
167  {
168  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
169  return false;
170  }
171  }
172 
173  template<typename TDatums, typename TQueue>
174  bool QueueBase<TDatums, TQueue>::forcePush(const TDatums& tDatums)
175  {
176  try
177  {
178  const std::lock_guard<std::mutex> lock{mMutex};
179  if (mTQueue.size() >= getMaxSize())
180  mTQueue.pop();
181  return push(tDatums);
182  }
183  catch (const std::exception& e)
184  {
185  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
186  return false;
187  }
188  }
189 
190  template<typename TDatums, typename TQueue>
191  bool QueueBase<TDatums, TQueue>::tryPush(const TDatums& tDatums)
192  {
193  try
194  {
195  const std::lock_guard<std::mutex> lock{mMutex};
196  if (mTQueue.size() >= getMaxSize())
197  return false;
198  return push(tDatums);
199  }
200  catch (const std::exception& e)
201  {
202  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
203  return false;
204  }
205  }
206 
207  template<typename TDatums, typename TQueue>
208  bool QueueBase<TDatums, TQueue>::waitAndPush(const TDatums& tDatums)
209  {
210  try
211  {
212  std::unique_lock<std::mutex> lock{mMutex};
213  mConditionVariable.wait(lock, [this]{return mTQueue.size() < getMaxSize() || mPushIsStopped; });
214  return push(tDatums);
215  }
216  catch (const std::exception& e)
217  {
218  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
219  return false;
220  }
221  }
222 
223  template<typename TDatums, typename TQueue>
224  bool QueueBase<TDatums, TQueue>::tryPop(TDatums& tDatums)
225  {
226  try
227  {
228  const std::lock_guard<std::mutex> lock{mMutex};
229  return pop(tDatums);
230  }
231  catch (const std::exception& e)
232  {
233  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
234  return false;
235  }
236  }
237 
238  template<typename TDatums, typename TQueue>
240  {
241  try
242  {
243  const std::lock_guard<std::mutex> lock{mMutex};
244  return pop();
245  }
246  catch (const std::exception& e)
247  {
248  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
249  return false;
250  }
251  }
252 
253  template<typename TDatums, typename TQueue>
255  {
256  try
257  {
258  std::unique_lock<std::mutex> lock{mMutex};
259  mConditionVariable.wait(lock, [this]{return !mTQueue.empty() || mPopIsStopped; });
260  return pop(tDatums);
261  }
262  catch (const std::exception& e)
263  {
264  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
265  return false;
266  }
267  }
268 
269  template<typename TDatums, typename TQueue>
271  {
272  try
273  {
274  std::unique_lock<std::mutex> lock{mMutex};
275  mConditionVariable.wait(lock, [this]{return !mTQueue.empty() || mPopIsStopped; });
276  return pop();
277  }
278  catch (const std::exception& e)
279  {
280  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
281  return false;
282  }
283  }
284 
285  template<typename TDatums, typename TQueue>
287  {
288  try
289  {
290  const std::lock_guard<std::mutex> lock{mMutex};
291  return mTQueue.empty();
292  }
293  catch (const std::exception& e)
294  {
295  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
296  return false;
297  }
298  }
299 
300  template<typename TDatums, typename TQueue>
302  {
303  try
304  {
305  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
306  const std::lock_guard<std::mutex> lock{mMutex};
307  mPopIsStopped = {true};
308  mPushIsStopped = {true};
309  while (!mTQueue.empty())
310  mTQueue.pop();
311  mConditionVariable.notify_all();
312  }
313  catch (const std::exception& e)
314  {
315  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
316  }
317  }
318 
319  template<typename TDatums, typename TQueue>
321  {
322  try
323  {
324  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
325  const std::lock_guard<std::mutex> lock{mMutex};
326  mPushers--;
327  if (mPushers == 0)
328  {
329  mPushIsStopped = {true};
330  if (mTQueue.empty())
331  mPopIsStopped = {true};
332  mConditionVariable.notify_all();
333  }
334  }
335  catch (const std::exception& e)
336  {
337  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
338  }
339  }
340 
341  template<typename TDatums, typename TQueue>
343  {
344  try
345  {
346  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
347  const std::lock_guard<std::mutex> lock{mMutex};
348  mPoppers++;
349  updateMaxPoppersPushers();
350  }
351  catch (const std::exception& e)
352  {
353  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
354  }
355  }
356 
357  template<typename TDatums, typename TQueue>
359  {
360  try
361  {
362  opLog("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
363  const std::lock_guard<std::mutex> lock{mMutex};
364  mPushers++;
365  updateMaxPoppersPushers();
366  }
367  catch (const std::exception& e)
368  {
369  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
370  }
371  }
372 
373  template<typename TDatums, typename TQueue>
375  {
376  try
377  {
378  const std::lock_guard<std::mutex> lock{mMutex};
379  return !(mPushIsStopped && (mPopIsStopped || mTQueue.empty()));
380  }
381  catch (const std::exception& e)
382  {
383  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
384  return true;
385  }
386  }
387 
388  template<typename TDatums, typename TQueue>
390  {
391  try
392  {
393  // No mutex required because the size() and getMaxSize() are already thread-safe
394  return size() == getMaxSize();
395  }
396  catch (const std::exception& e)
397  {
398  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
399  return false;
400  }
401  }
402 
403  template<typename TDatums, typename TQueue>
405  {
406  try
407  {
408  const std::lock_guard<std::mutex> lock{mMutex};
409  return mTQueue.size();
410  }
411  catch (const std::exception& e)
412  {
413  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
414  return 0;
415  }
416  }
417 
418  template<typename TDatums, typename TQueue>
420  {
421  try
422  {
423  const std::lock_guard<std::mutex> lock{mMutex};
424  while (!mTQueue.empty())
425  mTQueue.pop();
426  }
427  catch (const std::exception& e)
428  {
429  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
430  }
431  }
432 
433  template<typename TDatums, typename TQueue>
434  unsigned long long QueueBase<TDatums, TQueue>::getMaxSize() const
435  {
436  try
437  {
438  return (mMaxSize > 0 ? mMaxSize : fastMax(1ll, mMaxPoppersPushers));
439  }
440  catch (const std::exception& e)
441  {
442  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
443  return false;
444  }
445  }
446 
447  template<typename TDatums, typename TQueue>
448  bool QueueBase<TDatums, TQueue>::emplace(TDatums& tDatums)
449  {
450  try
451  {
452  if (mPushIsStopped)
453  return false;
454 
455  mTQueue.emplace(tDatums);
456  mConditionVariable.notify_all();
457  return true;
458  }
459  catch (const std::exception& e)
460  {
461  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
462  return false;
463  }
464  }
465 
466  template<typename TDatums, typename TQueue>
467  bool QueueBase<TDatums, TQueue>::push(const TDatums& tDatums)
468  {
469  try
470  {
471  if (mPushIsStopped)
472  return false;
473 
474  mTQueue.push(tDatums);
475  mConditionVariable.notify_all();
476  return true;
477  }
478  catch (const std::exception& e)
479  {
480  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
481  return false;
482  }
483  }
484 
485  template<typename TDatums, typename TQueue>
486  bool QueueBase<TDatums, TQueue>::pop()
487  {
488  try
489  {
490  if (mPopIsStopped || mTQueue.empty())
491  return false;
492 
493  mTQueue.pop();
494  mConditionVariable.notify_all();
495  return true;
496  }
497  catch (const std::exception& e)
498  {
499  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
500  return false;
501  }
502  }
503 
504  template<typename TDatums, typename TQueue>
505  void QueueBase<TDatums, TQueue>::updateMaxPoppersPushers()
506  {
507  try
508  {
509  mMaxPoppersPushers = fastMax(mPoppers, mPushers);
510  }
511  catch (const std::exception& e)
512  {
513  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
514  }
515  }
516 
517  extern template class QueueBase<BASE_DATUMS_SH, std::queue<BASE_DATUMS_SH>>;
518  extern template class QueueBase<
520  std::priority_queue<BASE_DATUMS_SH, std::vector<BASE_DATUMS_SH>,
521  std::greater<BASE_DATUMS_SH>>>;
522 }
523 
524 #endif // OPENPOSE_THREAD_QUEUE_BASE_HPP
long long mPoppers
Definition: queueBase.hpp:61
bool isFull() const
Definition: queueBase.hpp:389
void addPusher()
Definition: queueBase.hpp:358
long long mMaxPoppersPushers
Definition: queueBase.hpp:63
std::mutex mMutex
Definition: queueBase.hpp:60
bool waitAndPop(TDatums &tDatums)
Definition: queueBase.hpp:254
void stopPusher()
Definition: queueBase.hpp:320
bool tryPush(const TDatums &tDatums)
Definition: queueBase.hpp:191
TQueue mTQueue
Definition: queueBase.hpp:67
virtual bool pop(TDatums &tDatums)=0
bool empty() const
Definition: queueBase.hpp:286
bool mPopIsStopped
Definition: queueBase.hpp:64
bool tryEmplace(TDatums &tDatums)
Definition: queueBase.hpp:141
unsigned long long getMaxSize() const
Definition: queueBase.hpp:434
long long mPushers
Definition: queueBase.hpp:62
bool tryPop(TDatums &tDatums)
Definition: queueBase.hpp:224
bool waitAndPop()
Definition: queueBase.hpp:270
bool waitAndEmplace(TDatums &tDatums)
Definition: queueBase.hpp:158
std::condition_variable mConditionVariable
Definition: queueBase.hpp:66
bool forceEmplace(TDatums &tDatums)
Definition: queueBase.hpp:124
size_t size() const
Definition: queueBase.hpp:404
bool isRunning() const
Definition: queueBase.hpp:374
virtual TDatums front() const =0
bool waitAndPush(const TDatums &tDatums)
Definition: queueBase.hpp:208
bool forcePush(const TDatums &tDatums)
Definition: queueBase.hpp:174
void addPopper()
Definition: queueBase.hpp:342
QueueBase(const long long maxSize=-1)
Definition: queueBase.hpp:98
virtual ~QueueBase()
Definition: queueBase.hpp:109
bool mPushIsStopped
Definition: queueBase.hpp:65
#define BASE_DATUMS_SH
Definition: datum.hpp:405
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
T fastMax(const T a, const T b)
Definition: fastMath.hpp:73
OP_API void opLog(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")