OpenPose  1.7.0
The first real-time multi-person system to jointly detect human body, hand, facial, and foot keypoints
wQueueOrderer.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
2 #define OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
3 
4 #include <queue> // std::priority_queue
8 
9 namespace op
10 {
11  template<typename TDatums>
12  class WQueueOrderer : public Worker<TDatums>
13  {
14  public:
15  explicit WQueueOrderer(const unsigned int maxBufferSize = 64u);
16 
17  virtual ~WQueueOrderer();
18 
20 
21  void work(TDatums& tDatums);
22 
23  void tryStop();
24 
25  private:
26  const unsigned int mMaxBufferSize;
27  bool mStopWhenEmpty;
28  unsigned long long mNextExpectedId;
29  unsigned long long mNextExpectedSubId;
30  std::priority_queue<TDatums, std::vector<TDatums>, PointerContainerGreater<TDatums>> mPriorityQueueBuffer;
31 
32  DELETE_COPY(WQueueOrderer);
33  };
34 }
35 
36 
37 
38 
39 
40 // Implementation
41 namespace op
42 {
43  template<typename TDatums>
44  WQueueOrderer<TDatums>::WQueueOrderer(const unsigned int maxBufferSize) :
45  mMaxBufferSize{maxBufferSize},
46  mStopWhenEmpty{false},
47  mNextExpectedId{0},
48  mNextExpectedSubId{0}
49  {
50  }
51 
52  template<typename TDatums>
54  {
55  }
56 
57  template<typename TDatums>
59  {
60  }
61 
62  template<typename TDatums>
63  void WQueueOrderer<TDatums>::work(TDatums& tDatums)
64  {
65  try
66  {
67  // Profiling speed
68  const auto profilerKey = Profiler::timerInit(__LINE__, __FUNCTION__, __FILE__);
69  bool profileSpeed = (tDatums != nullptr);
70  // Input TDatum -> enqueue or return it back
71  if (checkNoNullNorEmpty(tDatums))
72  {
73  // T* to T
74  auto& tDatumsNoPtr = *tDatums;
75  // tDatums is the next expected, update counter
76  if (tDatumsNoPtr[0]->id == mNextExpectedId && tDatumsNoPtr[0]->subId == mNextExpectedSubId)
77  {
78  // If single-view
79  if (tDatumsNoPtr[0]->subIdMax == 0)
80  mNextExpectedId++;
81  // If muilti-view system
82  else
83  {
84  mNextExpectedSubId++;
85  if (mNextExpectedSubId > tDatumsNoPtr[0]->subIdMax)
86  {
87  mNextExpectedSubId = 0;
88  mNextExpectedId++;
89  }
90  }
91  }
92  // Else push it to our buffered queue
93  else
94  {
95  // Enqueue current tDatums
96  mPriorityQueueBuffer.emplace(tDatums);
97  tDatums = nullptr;
98  // Else if buffer full -> remove one tDatums
99  if (mPriorityQueueBuffer.size() > mMaxBufferSize)
100  {
101  tDatums = mPriorityQueueBuffer.top();
102  mPriorityQueueBuffer.pop();
103  }
104  }
105  }
106  // If input TDatum enqueued -> check if previously enqueued next desired frame and pop it
107  if (!checkNoNullNorEmpty(tDatums))
108  {
109  // Retrieve frame if next is desired frame or if we want to stop this worker
110  if (!mPriorityQueueBuffer.empty()
111  && (mStopWhenEmpty ||
112  ((*mPriorityQueueBuffer.top())[0]->id == mNextExpectedId
113  && (*mPriorityQueueBuffer.top())[0]->subId == mNextExpectedSubId)))
114  {
115  tDatums = { mPriorityQueueBuffer.top() };
116  mPriorityQueueBuffer.pop();
117  }
118  }
119  // If TDatum ready to be returned -> updated next expected id
120  if (checkNoNullNorEmpty(tDatums))
121  {
122  const auto& tDatumsNoPtr = *tDatums;
123  // If single-view
124  if (tDatumsNoPtr[0]->subIdMax == 0)
125  mNextExpectedId = tDatumsNoPtr[0]->id + 1;
126  // If muilti-view system
127  else
128  {
129  mNextExpectedSubId = tDatumsNoPtr[0]->subId + 1;
130  if (mNextExpectedSubId > tDatumsNoPtr[0]->subIdMax)
131  {
132  mNextExpectedSubId = 0;
133  mNextExpectedId = tDatumsNoPtr[0]->id + 1;
134  }
135  }
136  }
137  // Sleep if no new tDatums to either pop or push
138  if (!checkNoNullNorEmpty(tDatums) && mPriorityQueueBuffer.size() < mMaxBufferSize / 2u)
139  std::this_thread::sleep_for(std::chrono::milliseconds{1});
140  // If TDatum popped and/or pushed
141  if (profileSpeed || tDatums != nullptr)
142  {
143  // Profiling speed
144  Profiler::timerEnd(profilerKey);
145  Profiler::printAveragedTimeMsOnIterationX(profilerKey, __LINE__, __FUNCTION__, __FILE__);
146  // Debugging log
147  opLogIfDebug("", Priority::Low, __LINE__, __FUNCTION__, __FILE__);
148  }
149  }
150  catch (const std::exception& e)
151  {
152  this->stop();
153  tDatums = nullptr;
154  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
155  }
156  }
157 
158  template<typename TDatums>
160  {
161  try
162  {
163  // Close if all frames were retrieved from the queue
164  if (mPriorityQueueBuffer.empty())
165  this->stop();
166  mStopWhenEmpty = true;
167 
168  }
169  catch (const std::exception& e)
170  {
171  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
172  }
173  }
174 
176 }
177 
178 #endif // OPENPOSE_THREAD_W_QUEUE_ORDERER_HPP
static void printAveragedTimeMsOnIterationX(const std::string &key, const int line, const std::string &function, const std::string &file, const unsigned long long x=DEFAULT_X)
static const std::string timerInit(const int line, const std::string &function, const std::string &file)
static void timerEnd(const std::string &key)
void work(TDatums &tDatums)
WQueueOrderer(const unsigned int maxBufferSize=64u)
virtual ~WQueueOrderer()
void initializationOnThread()
bool checkNoNullNorEmpty(const TPointerContainer &tPointerContainer)
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
void opLogIfDebug(const T &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")
Definition: errorAndLog.hpp:97