GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
ThreadsafeQueue.h
Go to the documentation of this file.
1#ifndef THREADSAFEQUEUE_H
2#define THREADSAFEQUEUE_H
3
4////////////////////////////////////////////////////////////////////////////////
5///
6/// \class ThreadsafeQueue
7///
8/// Template for all queues used to send data from one thread/loop to the next.
9///
10////////////////////////////////////////////////////////////////////////////////
11
12#include <cassert>
13#include <iostream>
14
15#ifndef __CINT__
16#include <atomic>
17#include <memory>
18#include <chrono>
19#include <condition_variable>
20#include <mutex>
21#include <queue>
22#include <utility>
23#endif
24
25class TDetector;
26
27template <typename T>
29public:
30 explicit ThreadsafeQueue(std::string name = "default", size_t maxSize = 100000);
32 ThreadsafeQueue(ThreadsafeQueue&&) noexcept = default;
33 ThreadsafeQueue& operator=(const ThreadsafeQueue&) = default;
34 ThreadsafeQueue& operator=(ThreadsafeQueue&&) noexcept = default;
35 ~ThreadsafeQueue() = default;
36#ifndef __CINT__
37 int Push(T obj);
38 size_t Pop(T& output, int millisecond_wait = 1000);
39
40 size_t ItemsPushed() const;
41 size_t ItemsPopped() const;
42 size_t Size() const;
43
44 std::string Name() { return fName; }
45
46 // int ObjectSize(T&) const;
47
48 bool IsFinished() const;
49 void SetFinished(bool finished = true);
50
51private:
52 std::string fName;
53 mutable std::mutex mutex;
54 std::queue<T> queue;
55 std::condition_variable can_push;
56 std::condition_variable can_pop;
57
58 std::atomic_int num_writers{0};
59
60 size_t max_queue_size{100000};
61
62 size_t items_in_queue{0};
63 size_t items_pushed{0};
64 size_t items_popped{0};
65
66 std::atomic_bool is_finished;
67#endif
68};
69
70#ifndef __CINT__
71template <typename T>
72ThreadsafeQueue<T>::ThreadsafeQueue(std::string name, size_t maxSize)
73 : fName(std::move(name)), max_queue_size(maxSize), is_finished(false)
74{
75}
76
77template <typename T>
79{
80 std::unique_lock<std::mutex> lock(mutex);
81 if(queue.size() > max_queue_size) {
82 can_push.wait(lock);
83 }
84
85 items_pushed++;
86 items_in_queue++;
87
88 queue.push(obj);
89 can_pop.notify_one();
90 return 1;
91}
92
93template <typename T>
94size_t ThreadsafeQueue<T>::Pop(T& output, int millisecond_wait)
95{
96 std::unique_lock<std::mutex> lock(mutex);
97 if(!queue.size() && millisecond_wait) {
98 can_pop.wait_for(lock, std::chrono::milliseconds(millisecond_wait));
99 }
100
101 if(!queue.size()) {
102 return -1;
103 }
104
105 output = queue.front();
106 queue.pop();
107
108 items_popped++;
109 items_in_queue--;
110
111 can_push.notify_one();
112 return queue.size();
113 // return ObjectSize(output);
114}
115
116template <typename T>
118{
119 std::unique_lock<std::mutex> lock(mutex);
120 return items_in_queue;
121}
122
123template <typename T>
125{
126 std::unique_lock<std::mutex> lock(mutex);
127 return items_pushed;
128}
129
130template <typename T>
132{
133 std::unique_lock<std::mutex> lock(mutex);
134 return items_popped;
135}
136
137template <typename T>
139{
140 return is_finished;
141}
142
143template <typename T>
145{
146 // std::cout<<std::endl<<fName<<": finished = "<<finished<<std::endl;
147 is_finished = finished;
148}
149#endif /* __CINT__ */
150
151#endif /* _THREADSAFEQUEUE_H_ */
std::atomic_int num_writers
void SetFinished(bool finished=true)
std::string fName
ThreadsafeQueue(const ThreadsafeQueue &)=default
std::atomic_bool is_finished
size_t ItemsPushed() const
bool IsFinished() const
size_t ItemsPopped() const
std::queue< T > queue
std::condition_variable can_pop
ThreadsafeQueue(std::string name="default", size_t maxSize=100000)
size_t Size() const
std::string Name()
ThreadsafeQueue(ThreadsafeQueue &&) noexcept=default
std::condition_variable can_push
size_t Pop(T &output, int millisecond_wait=1000)