OpenPose  1.7.0
The first real-time multi-person system to jointly detect human body, hand, facial, and foot keypoints
subThreadQueueInOut.hpp
Go to the documentation of this file.
1 #ifndef OPENPOSE_THREAD_THREAD_QUEUE_IN_OUT_HPP
2 #define OPENPOSE_THREAD_THREAD_QUEUE_IN_OUT_HPP
3 
8 
9 namespace op
10 {
11  template<typename TDatums, typename TWorker = std::shared_ptr<Worker<TDatums>>, typename TQueue = Queue<TDatums>>
12  class SubThreadQueueInOut : public SubThread<TDatums, TWorker>
13  {
14  public:
15  SubThreadQueueInOut(const std::vector<TWorker>& tWorkers, const std::shared_ptr<TQueue>& tQueueIn,
16  const std::shared_ptr<TQueue>& tQueueOut);
17 
18  virtual ~SubThreadQueueInOut();
19 
20  bool work();
21 
22  private:
23  std::shared_ptr<TQueue> spTQueueIn;
24  std::shared_ptr<TQueue> spTQueueOut;
25 
26  DELETE_COPY(SubThreadQueueInOut);
27  };
28 }
29 
30 
31 
32 
33 
34 // Implementation
35 namespace op
36 {
37  template<typename TDatums, typename TWorker, typename TQueue>
39  const std::shared_ptr<TQueue>& tQueueIn,
40  const std::shared_ptr<TQueue>& tQueueOut) :
41  SubThread<TDatums, TWorker>{tWorkers},
42  spTQueueIn{tQueueIn},
43  spTQueueOut{tQueueOut}
44  {
45  // spTQueueIn->addPopper();
46  spTQueueOut->addPusher();
47  }
48 
49  template<typename TDatums, typename TWorker, typename TQueue>
51  {
52  }
53 
54  template<typename TDatums, typename TWorker, typename TQueue>
56  {
57  try
58  {
59  // If output queue is closed -> close input queue
60  if (!spTQueueOut->isRunning())
61  {
62  spTQueueIn->stop();
63  return false;
64  }
65  // If output queue running -> normal operation
66  else
67  {
68  // Don't work until next queue is not full
69  // This reduces latency to half
70  if (!spTQueueOut->isFull())
71  {
72  // Pop TDatums
73  if (spTQueueIn->empty())
74  std::this_thread::sleep_for(std::chrono::microseconds{100});
75  TDatums tDatums;
76  bool workersAreRunning = spTQueueIn->tryPop(tDatums);
77  // Check queue not stopped
78  if (!workersAreRunning)
79  workersAreRunning = spTQueueIn->isRunning();
80  // Process TDatums
81  workersAreRunning = this->workTWorkers(tDatums, workersAreRunning);
82  // Push/emplace tDatums if successfully processed
83  if (workersAreRunning)
84  {
85  if (tDatums != nullptr)
86  spTQueueOut->waitAndEmplace(tDatums);
87  }
88  // Close both queues otherwise
89  else
90  {
91  spTQueueIn->stop();
92  spTQueueOut->stopPusher();
93  }
94  return workersAreRunning;
95  }
96  else
97  {
98  std::this_thread::sleep_for(std::chrono::microseconds{100});
99  return true;
100  }
101  }
102  }
103  catch (const std::exception& e)
104  {
105  error(e.what(), __LINE__, __FUNCTION__, __FILE__);
106  spTQueueIn->stop();
107  spTQueueOut->stop();
108  return false;
109  }
110  }
111 
113 }
114 
115 #endif // OPENPOSE_THREAD_THREAD_QUEUE_IN_OUT_HPP
SubThreadQueueInOut(const std::vector< TWorker > &tWorkers, const std::shared_ptr< TQueue > &tQueueIn, const std::shared_ptr< TQueue > &tQueueOut)
COMPILE_TEMPLATE_DATUM(WPoseTriangulation)
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")