GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
StoppableThread.h
Go to the documentation of this file.
1#ifndef STOPPABLETHREAD_H
2#define STOPPABLETHREAD_H
3
4/** \addtogroup Loops
5 * @{
6 */
7
8////////////////////////////////////////////////////////////////////////////////
9///
10/// \class StoppableThread
11///
12/// Base-class for all loops/threads.
13///
14////////////////////////////////////////////////////////////////////////////////
15
16#ifndef __CINT__
17#include <atomic>
18#include <condition_variable>
19#include <thread>
20#endif
21
22#include <sstream>
23#include <iomanip>
24#include <string>
25#include <map>
26
27#include "TObject.h"
28
30public:
31 explicit StoppableThread(std::string name);
33 StoppableThread(StoppableThread&&) noexcept = delete;
34 StoppableThread& operator=(const StoppableThread&) = delete;
35 StoppableThread& operator=(StoppableThread&&) noexcept = delete;
36 virtual ~StoppableThread();
37
38 static void SendStop();
39 static void StopAll();
40 static bool AnyThreadRunning();
41 static std::string AnyThreadStatus();
42 static std::string AllThreadProgress();
43 static std::string AllThreadHeader();
44 static std::string AllThreadStatus();
45
46 static void PauseAll();
47 static void ResumeAll();
48
49 static StoppableThread* Get(const std::string& name);
50 static std::vector<StoppableThread*> GetAll();
51
52 void Resume();
53 void Pause();
54 void Stop();
55 bool IsPaused();
56 bool IsRunning();
57 void Join();
58
59 virtual void OnEnd() {}
60 virtual std::string Status();
61 virtual std::string Progress();
62 virtual std::string EndStatus() { return {}; }
63 std::string Name() const { return fName; }
64
65 virtual void ClearQueue() {}
66 static void ClearAllQueues();
67
68 // protected:
69 virtual bool Iteration() = 0;
70
71 virtual size_t GetItemsPopped() = 0;
72 virtual size_t GetItemsPushed() = 0;
73 virtual size_t GetItemsCurrent() = 0;
74 virtual size_t GetRate() = 0;
75
76 static int GetNThreads();
77
78 static void Print();
79
80 static void ColumnWidth(size_t val) { fColumnWidth = val; }
81 static void StatusWidth(size_t val) { fStatusWidth = val; }
82 static size_t ColumnWidth() { return fColumnWidth; }
83 static size_t StatusWidth() { return fStatusWidth; }
84
85 static void start_status_thread();
86 static void stop_status_thread();
87 static void join_status_thread();
88 static void status_out_loop();
89 static void status_out();
90
91protected:
92#ifndef __CINT__
93 void ItemsPopped(size_t val) { fItemsPopped = val; }
94 void InputSize(int64_t val) { fInputSize = val; }
95 std::atomic_size_t& ItemsPopped() { return fItemsPopped; }
96 std::atomic_long& InputSize() { return fInputSize; }
98#endif
99private:
100#ifndef __CINT__
101 std::atomic_size_t fItemsPopped{0}; ///< number of items popped from input queue
102 std::atomic_long fInputSize{0}; ///< number of items in the input (queue), only updated within Iteration(), so not
103 ///< always fully up-to-date (signed to hold error from queue::pop)
104#endif
105
106 std::string fName;
107
108 static size_t fColumnWidth;
109 static size_t fStatusWidth;
110
111 void Loop();
112
113 static std::map<std::string, StoppableThread*> fThreadMap;
114
115 static bool fStatusThreadOn;
116#ifndef __CINT__
117 static std::thread fStatusThread;
118 std::thread fThread;
119 std::atomic_bool fRunning{false};
120 std::atomic_bool fForceStop{false};
121 std::atomic_bool fPaused{false};
122 std::condition_variable fPausedWait;
123 std::mutex fPauseMutex;
124#endif
125
126 /// \cond CLASSIMP
127 ClassDef(StoppableThread, 0) // NOLINT(readability-else-after-return)
128 /// \endcond
129};
130
131/*! @} */
132#endif /* _STOPPABLETHREAD_H_ */
virtual std::string Status()
virtual size_t GetRate()=0
static void start_status_thread()
static void StopAll()
StoppableThread(StoppableThread &&) noexcept=delete
static std::string AnyThreadStatus()
StoppableThread(std::string name)
static void ClearAllQueues()
virtual std::string EndStatus()
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
virtual size_t GetItemsPushed()=0
static std::vector< StoppableThread * > GetAll()
static std::map< std::string, StoppableThread * > fThreadMap
StoppableThread(const StoppableThread &)=delete
static void join_status_thread()
void ItemsPopped(size_t val)
static std::string AllThreadStatus()
static size_t fColumnWidth
static bool fStatusThreadOn
static void status_out_loop()
virtual void OnEnd()
virtual std::string Progress()
static size_t fStatusWidth
static size_t StatusWidth()
std::condition_variable fPausedWait
static int GetNThreads()
static std::string AllThreadProgress()
static void StatusWidth(size_t val)
std::mutex fPauseMutex
virtual size_t GetItemsPopped()=0
static std::string AllThreadHeader()
static StoppableThread * Get(const std::string &name)
void InputSize(int64_t val)
std::thread fThread
static std::thread fStatusThread
std::atomic_bool fPaused
static void PauseAll()
static void stop_status_thread()
static void ColumnWidth(size_t val)
std::atomic_size_t fItemsPopped
number of items popped from input queue
static void Print()
static bool AnyThreadRunning()
virtual void ClearQueue()
std::atomic_bool fForceStop
std::string Name() const
static void SendStop()
static void ResumeAll()
void IncrementItemsPopped()
static void status_out()
static size_t ColumnWidth()
std::atomic_long fInputSize
std::atomic_bool fRunning
virtual size_t GetItemsCurrent()=0
virtual bool Iteration()=0