GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
ThreadsafeQueue.h
Go to the documentation of this file.
1#ifndef THREADSAFEQUEUE_H
2#define THREADSAFEQUEUE_H
3
4////////////////////////////////////////////////////////////////////////////////
5///
6/// \class ThreadsafeQueue
7///
8/// Template for all queues used to send data from one thread/loop to the next.
9///
10////////////////////////////////////////////////////////////////////////////////
11
12#include <cassert>
13#include <string>
14
15#ifndef __CINT__
16#include <atomic>
17#include <chrono>
18#include <condition_variable>
19#include <mutex>
20#include <queue>
21#include <utility>
22#endif
23
24class TDetector;
25
26template <typename T>
28public:
29 explicit ThreadsafeQueue(std::string name = "default", size_t maxSize = 100000);
31 ThreadsafeQueue(ThreadsafeQueue&&) noexcept = default;
32 ThreadsafeQueue& operator=(const ThreadsafeQueue&) = default;
33 ThreadsafeQueue& operator=(ThreadsafeQueue&&) noexcept = default;
34 ~ThreadsafeQueue() = default;
35#ifndef __CINT__
36 int Push(T obj);
37 size_t Pop(T& output, int millisecond_wait = 1000);
38
39 size_t ItemsPushed() const;
40 size_t ItemsPopped() const;
41 size_t Size() const;
42
43 std::string Name() { return fName; }
44
45 // int ObjectSize(T&) const;
46
47 bool IsFinished() const;
48 void SetFinished(bool finished = true);
49
50private:
51 std::string fName;
52 mutable std::mutex mutex;
53 std::queue<T> queue;
54 std::condition_variable can_push;
55 std::condition_variable can_pop;
56
57 std::atomic_int num_writers{0};
58
59 size_t max_queue_size{100000};
60
61 size_t items_in_queue{0};
62 size_t items_pushed{0};
63 size_t items_popped{0};
64
65 std::atomic_bool is_finished;
66#endif
67};
68
69#ifndef __CINT__
70template <typename T>
71ThreadsafeQueue<T>::ThreadsafeQueue(std::string name, size_t maxSize)
72 : fName(std::move(name)), max_queue_size(maxSize), is_finished(false)
73{
74}
75
76template <typename T>
78{
79 std::unique_lock<std::mutex> lock(mutex);
80 if(queue.size() > max_queue_size) {
81 can_push.wait(lock);
82 }
83
84 items_pushed++;
85 items_in_queue++;
86
87 queue.push(obj);
88 can_pop.notify_one();
89 return 1;
90}
91
92template <typename T>
93size_t ThreadsafeQueue<T>::Pop(T& output, int millisecond_wait)
94{
95 std::unique_lock<std::mutex> lock(mutex);
96 if(!queue.size() && millisecond_wait) {
97 can_pop.wait_for(lock, std::chrono::milliseconds(millisecond_wait));
98 }
99
100 if(!queue.size()) {
101 return -1;
102 }
103
104 output = queue.front();
105 queue.pop();
106
107 items_popped++;
108 items_in_queue--;
109
110 can_push.notify_one();
111 return queue.size();
112 // return ObjectSize(output);
113}
114
115template <typename T>
117{
118 std::unique_lock<std::mutex> lock(mutex);
119 return items_in_queue;
120}
121
122template <typename T>
124{
125 std::unique_lock<std::mutex> lock(mutex);
126 return items_pushed;
127}
128
129template <typename T>
131{
132 std::unique_lock<std::mutex> lock(mutex);
133 return items_popped;
134}
135
136template <typename T>
138{
139 return is_finished;
140}
141
142template <typename T>
144{
145 // std::cout<<std::endl<<fName<<": finished = "<<finished<<std::endl;
146 is_finished = finished;
147}
148#endif /* __CINT__ */
149
150#endif /* _THREADSAFEQUEUE_H_ */
std::atomic_int num_writers
void SetFinished(bool finished=true)
std::string fName
ThreadsafeQueue(const ThreadsafeQueue &)=default
std::atomic_bool is_finished
size_t ItemsPushed() const
bool IsFinished() const
size_t ItemsPopped() const
std::queue< T > queue
std::condition_variable can_pop
ThreadsafeQueue(std::string name="default", size_t maxSize=100000)
size_t Size() const
std::string Name()
ThreadsafeQueue(ThreadsafeQueue &&) noexcept=default
std::condition_variable can_push
size_t Pop(T &output, int millisecond_wait=1000)