GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
StoppableThread.h
Go to the documentation of this file.
1#ifndef STOPPABLETHREAD_H
2#define STOPPABLETHREAD_H
3
4/** \addtogroup Loops
5 * @{
6 */
7
8////////////////////////////////////////////////////////////////////////////////
9///
10/// \class StoppableThread
11///
12/// Base-class for all loops/threads.
13///
14////////////////////////////////////////////////////////////////////////////////
15
16#ifndef __CINT__
17#include <atomic>
18#include <condition_variable>
19#include <thread>
20#endif
21
22#include <string>
23#include <vector>
24#include <map>
25
26#include "Rtypes.h"
27
29public:
30 explicit StoppableThread(std::string name);
32 StoppableThread(StoppableThread&&) noexcept = delete;
33 StoppableThread& operator=(const StoppableThread&) = delete;
34 StoppableThread& operator=(StoppableThread&&) noexcept = delete;
35 virtual ~StoppableThread();
36
37 static void SendStop();
38 static void StopAll();
39 static bool AnyThreadRunning();
40 static std::string AnyThreadStatus();
41 static std::string AllThreadProgress();
42 static std::string AllThreadHeader();
43 static std::string AllThreadStatus();
44
45 static void PauseAll();
46 static void ResumeAll();
47
48 static StoppableThread* Get(const std::string& name);
49 static std::vector<StoppableThread*> GetAll();
50
51 void Resume();
52 void Pause();
53 void Stop();
54 bool IsPaused();
55 bool IsRunning();
56 void Join();
57
58 virtual void OnEnd() {}
59 virtual std::string Status();
60 virtual std::string Progress();
61 virtual std::string EndStatus() { return {}; }
62 std::string Name() const { return fName; }
63
64 virtual void ClearQueue() {}
65 static void ClearAllQueues();
66
67 // protected:
68 virtual bool Iteration() = 0;
69
70 virtual size_t GetItemsPopped() = 0;
71 virtual size_t GetItemsPushed() = 0;
72 virtual size_t GetItemsCurrent() = 0;
73 virtual size_t GetRate() = 0;
74
75 static int GetNThreads();
76
77 static void Print();
78
79 static void ColumnWidth(size_t val) { fColumnWidth = val; }
80 static void StatusWidth(size_t val) { fStatusWidth = val; }
81 static size_t ColumnWidth() { return fColumnWidth; }
82 static size_t StatusWidth() { return fStatusWidth; }
83
84 static void start_status_thread();
85 static void stop_status_thread();
86 static void join_status_thread();
87 static void status_out_loop();
88 static void status_out();
89
90protected:
91#ifndef __CINT__
92 void ItemsPopped(size_t val) { fItemsPopped = val; }
93 void InputSize(int64_t val) { fInputSize = val; }
94 std::atomic_size_t& ItemsPopped() { return fItemsPopped; }
95 std::atomic_long& InputSize() { return fInputSize; }
97#endif
98private:
99#ifndef __CINT__
100 std::atomic_size_t fItemsPopped{0}; ///< number of items popped from input queue
101 std::atomic_long fInputSize{0}; ///< number of items in the input (queue), only updated within Iteration(), so not
102 ///< always fully up-to-date (signed to hold error from queue::pop)
103#endif
104
105 std::string fName;
106
107 static size_t fColumnWidth;
108 static size_t fStatusWidth;
109
110 void Loop();
111
112 static std::map<std::string, StoppableThread*> fThreadMap;
113
114 static bool fStatusThreadOn;
115#ifndef __CINT__
116 static std::thread fStatusThread;
117 std::thread fThread;
118 std::atomic_bool fRunning{false};
119 std::atomic_bool fForceStop{false};
120 std::atomic_bool fPaused{false};
121 std::condition_variable fPausedWait;
122 std::mutex fPauseMutex;
123#endif
124
125 /// \cond CLASSIMP
126 ClassDef(StoppableThread, 0) // NOLINT(readability-else-after-return)
127 /// \endcond
128};
129
130/*! @} */
131#endif /* _STOPPABLETHREAD_H_ */
virtual std::string Status()
virtual size_t GetRate()=0
static void start_status_thread()
static void StopAll()
StoppableThread(StoppableThread &&) noexcept=delete
static std::string AnyThreadStatus()
StoppableThread(std::string name)
static void ClearAllQueues()
virtual std::string EndStatus()
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
virtual size_t GetItemsPushed()=0
static std::vector< StoppableThread * > GetAll()
static std::map< std::string, StoppableThread * > fThreadMap
StoppableThread(const StoppableThread &)=delete
static void join_status_thread()
void ItemsPopped(size_t val)
static std::string AllThreadStatus()
static size_t fColumnWidth
static bool fStatusThreadOn
static void status_out_loop()
virtual void OnEnd()
virtual std::string Progress()
static size_t fStatusWidth
static size_t StatusWidth()
std::condition_variable fPausedWait
static int GetNThreads()
static std::string AllThreadProgress()
static void StatusWidth(size_t val)
std::mutex fPauseMutex
virtual size_t GetItemsPopped()=0
static std::string AllThreadHeader()
static StoppableThread * Get(const std::string &name)
void InputSize(int64_t val)
std::thread fThread
static std::thread fStatusThread
std::atomic_bool fPaused
static void PauseAll()
static void stop_status_thread()
static void ColumnWidth(size_t val)
std::atomic_size_t fItemsPopped
number of items popped from input queue
static void Print()
static bool AnyThreadRunning()
virtual void ClearQueue()
std::atomic_bool fForceStop
std::string Name() const
static void SendStop()
static void ResumeAll()
void IncrementItemsPopped()
static void status_out()
static size_t ColumnWidth()
std::atomic_long fInputSize
std::atomic_bool fRunning
virtual size_t GetItemsCurrent()=0
virtual bool Iteration()=0