1 #ifndef OPENPOSE_THREAD_QUEUE_BASE_HPP
2 #define OPENPOSE_THREAD_QUEUE_BASE_HPP
4 #include <condition_variable>
11 template<
typename TDatums,
typename TQueue>
57 virtual TDatums
front()
const = 0;
69 virtual bool pop(TDatums& tDatums) = 0;
74 const long long mMaxSize;
76 bool emplace(TDatums& tDatums);
78 bool push(
const TDatums& tDatums);
82 void updateMaxPoppersPushers();
97 template<
typename TDatums,
typename TQueue>
101 mPopIsStopped{false},
102 mPushIsStopped{false},
108 template<
typename TDatums,
typename TQueue>
117 catch (
const std::exception& e)
119 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
123 template<
typename TDatums,
typename TQueue>
128 const std::lock_guard<std::mutex> lock{mMutex};
129 if (mTQueue.size() >= getMaxSize())
131 return emplace(tDatums);
133 catch (
const std::exception& e)
135 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
140 template<
typename TDatums,
typename TQueue>
145 const std::lock_guard<std::mutex> lock{mMutex};
146 if (mTQueue.size() >= getMaxSize())
148 return emplace(tDatums);
150 catch (
const std::exception& e)
152 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
157 template<
typename TDatums,
typename TQueue>
162 std::unique_lock<std::mutex> lock{mMutex};
163 mConditionVariable.wait(lock, [
this]{
return mTQueue.size() < getMaxSize() || mPushIsStopped; });
164 return emplace(tDatums);
166 catch (
const std::exception& e)
168 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
173 template<
typename TDatums,
typename TQueue>
178 const std::lock_guard<std::mutex> lock{mMutex};
179 if (mTQueue.size() >= getMaxSize())
181 return push(tDatums);
183 catch (
const std::exception& e)
185 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
190 template<
typename TDatums,
typename TQueue>
195 const std::lock_guard<std::mutex> lock{mMutex};
196 if (mTQueue.size() >= getMaxSize())
198 return push(tDatums);
200 catch (
const std::exception& e)
202 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
207 template<
typename TDatums,
typename TQueue>
212 std::unique_lock<std::mutex> lock{mMutex};
213 mConditionVariable.wait(lock, [
this]{
return mTQueue.size() < getMaxSize() || mPushIsStopped; });
214 return push(tDatums);
216 catch (
const std::exception& e)
218 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
223 template<
typename TDatums,
typename TQueue>
228 const std::lock_guard<std::mutex> lock{mMutex};
231 catch (
const std::exception& e)
233 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
238 template<
typename TDatums,
typename TQueue>
243 const std::lock_guard<std::mutex> lock{mMutex};
246 catch (
const std::exception& e)
248 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
253 template<
typename TDatums,
typename TQueue>
258 std::unique_lock<std::mutex> lock{mMutex};
259 mConditionVariable.wait(lock, [
this]{
return !mTQueue.empty() || mPopIsStopped; });
262 catch (
const std::exception& e)
264 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
269 template<
typename TDatums,
typename TQueue>
274 std::unique_lock<std::mutex> lock{mMutex};
275 mConditionVariable.wait(lock, [
this]{
return !mTQueue.empty() || mPopIsStopped; });
278 catch (
const std::exception& e)
280 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
285 template<
typename TDatums,
typename TQueue>
290 const std::lock_guard<std::mutex> lock{mMutex};
291 return mTQueue.empty();
293 catch (
const std::exception& e)
295 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
300 template<
typename TDatums,
typename TQueue>
306 const std::lock_guard<std::mutex> lock{mMutex};
307 mPopIsStopped = {
true};
308 mPushIsStopped = {
true};
309 while (!mTQueue.empty())
311 mConditionVariable.notify_all();
313 catch (
const std::exception& e)
315 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
319 template<
typename TDatums,
typename TQueue>
325 const std::lock_guard<std::mutex> lock{mMutex};
329 mPushIsStopped = {
true};
331 mPopIsStopped = {
true};
332 mConditionVariable.notify_all();
335 catch (
const std::exception& e)
337 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
341 template<
typename TDatums,
typename TQueue>
347 const std::lock_guard<std::mutex> lock{mMutex};
349 updateMaxPoppersPushers();
351 catch (
const std::exception& e)
353 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
357 template<
typename TDatums,
typename TQueue>
363 const std::lock_guard<std::mutex> lock{mMutex};
365 updateMaxPoppersPushers();
367 catch (
const std::exception& e)
369 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
373 template<
typename TDatums,
typename TQueue>
378 const std::lock_guard<std::mutex> lock{mMutex};
379 return !(mPushIsStopped && (mPopIsStopped || mTQueue.empty()));
381 catch (
const std::exception& e)
383 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
388 template<
typename TDatums,
typename TQueue>
394 return size() == getMaxSize();
396 catch (
const std::exception& e)
398 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
403 template<
typename TDatums,
typename TQueue>
408 const std::lock_guard<std::mutex> lock{mMutex};
409 return mTQueue.size();
411 catch (
const std::exception& e)
413 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
418 template<
typename TDatums,
typename TQueue>
423 const std::lock_guard<std::mutex> lock{mMutex};
424 while (!mTQueue.empty())
427 catch (
const std::exception& e)
429 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
433 template<
typename TDatums,
typename TQueue>
438 return (mMaxSize > 0 ? mMaxSize :
fastMax(1ll, mMaxPoppersPushers));
440 catch (
const std::exception& e)
442 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
447 template<
typename TDatums,
typename TQueue>
455 mTQueue.emplace(tDatums);
456 mConditionVariable.notify_all();
459 catch (
const std::exception& e)
461 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
466 template<
typename TDatums,
typename TQueue>
467 bool QueueBase<TDatums, TQueue>::push(
const TDatums& tDatums)
474 mTQueue.push(tDatums);
475 mConditionVariable.notify_all();
478 catch (
const std::exception& e)
480 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
485 template<
typename TDatums,
typename TQueue>
486 bool QueueBase<TDatums, TQueue>::pop()
490 if (mPopIsStopped || mTQueue.empty())
494 mConditionVariable.notify_all();
497 catch (
const std::exception& e)
499 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
504 template<
typename TDatums,
typename TQueue>
505 void QueueBase<TDatums, TQueue>::updateMaxPoppersPushers()
509 mMaxPoppersPushers =
fastMax(mPoppers, mPushers);
511 catch (
const std::exception& e)
513 error(e.what(), __LINE__, __FUNCTION__, __FILE__);
517 extern template class QueueBase<BASE_DATUMS_SH, std::queue<BASE_DATUMS_SH>>;
518 extern template class QueueBase<
520 std::priority_queue<BASE_DATUMS_SH, std::vector<BASE_DATUMS_SH>,
521 std::greater<BASE_DATUMS_SH>>>;
long long mMaxPoppersPushers
bool waitAndPop(TDatums &tDatums)
bool tryPush(const TDatums &tDatums)
virtual bool pop(TDatums &tDatums)=0
bool tryEmplace(TDatums &tDatums)
unsigned long long getMaxSize() const
bool tryPop(TDatums &tDatums)
bool waitAndEmplace(TDatums &tDatums)
std::condition_variable mConditionVariable
bool forceEmplace(TDatums &tDatums)
virtual TDatums front() const =0
bool waitAndPush(const TDatums &tDatums)
bool forcePush(const TDatums &tDatums)
QueueBase(const long long maxSize=-1)
OP_API void error(const std::string &message, const int line=-1, const std::string &function="", const std::string &file="")
T fastMax(const T a, const T b)
OP_API void opLog(const std::string &message, const Priority priority=Priority::Max, const int line=-1, const std::string &function="", const std::string &file="")