GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
StoppableThread.cxx
Go to the documentation of this file.
1#include "StoppableThread.h"
2
3#include <iostream>
4#include <fstream>
5#include <sstream>
6#include <iomanip>
7#include <utility>
8
9#include "TString.h"
10
11#include "TDataLoop.h"
12#include "TFragmentChainLoop.h"
13
14std::map<std::string, StoppableThread*> StoppableThread::fThreadMap;
17
20
22{
23 return static_cast<int>(fThreadMap.size());
24}
25
27 : fItemsPopped(0), fInputSize(0), fName(std::move(name)), fRunning(true), fPaused(true)
28{
29 // TODO: check if a thread already exists and delete?
30 fThreadMap.insert(std::make_pair(fName, this));
31 fThread = std::thread(&StoppableThread::Loop, this);
32 if(!fStatusThreadOn) {
34 }
35}
36
38{
39 for(auto& elem : fThreadMap) {
40 if(elem.second->IsRunning()) {
41 return true;
42 }
43 }
44 return false;
45}
46
48{
49 for(auto& elem : fThreadMap) {
50 if(elem.second->IsRunning()) {
51 std::ostringstream status;
52 status << elem.first << ":\t " << elem.second->Status();
53 return status.str();
54 }
55 }
56 return "";
57}
58
60{
61 std::ostringstream progress;
62 for(auto& elem : fThreadMap) {
63 if(elem.second->IsRunning()) {
64 progress << std::left << std::setw(static_cast<int>(fColumnWidth - 1)) << elem.second->Progress().substr(0, fColumnWidth - 1) << "|";
65 } else {
66 std::string prog = "not running";
67 progress << std::left << std::setw(static_cast<int>(fColumnWidth - 1)) << prog.substr(0, fColumnWidth - 1) << "|";
68 }
69 }
70 return progress.str().substr(0, fStatusWidth);
71}
72
74{
75 std::ostringstream str;
76 for(auto& elem : fThreadMap) {
77 // left align, fill with spaces
78 str << std::left << std::setw(static_cast<int>(fColumnWidth - 1)) << elem.first.substr(0, fColumnWidth - 1) << "|";
79 }
80 return str.str().substr(0, fStatusWidth);
81}
82
84{
85 std::ostringstream str;
86 for(auto& elem : fThreadMap) {
87 if(elem.second->IsRunning()) {
88 str << std::left << std::setw(static_cast<int>(fColumnWidth - 1)) << elem.second->Status().substr(0, fColumnWidth - 1) << "|";
89 } else {
90 std::string prog = "not running";
91 str << std::left << std::setw(static_cast<int>(fColumnWidth - 1)) << prog.substr(0, fColumnWidth - 1) << "|";
92 }
93 }
94 return str.str().substr(0, fStatusWidth);
95}
96
98{
99 for(auto& elem : fThreadMap) {
100 elem.second->Pause();
101 }
102}
103
105{
106 for(auto& elem : fThreadMap) {
107 elem.second->Resume();
108 }
109}
110
112{
113 std::ostringstream str;
114 str << std::setw(static_cast<int>(fColumnWidth / 2 - 1)) << fItemsPopped << "/" << std::setw(static_cast<int>(fColumnWidth / 2 - 1))
116 return str.str();
117}
118
120{
121 std::ostringstream str;
122 float percentDone = 100.f * static_cast<float>(fItemsPopped);
123 if(fItemsPopped + fInputSize > 0) {
124 percentDone /= static_cast<float>(fItemsPopped + fInputSize);
125 while(percentDone > 100.f / static_cast<float>(fColumnWidth - 1)) {
126 str << "*";
127 percentDone -= 100.f / static_cast<float>(fColumnWidth - 1);
128 }
129 } else {
130 str << "N/A: " << percentDone;
131 }
132 return str.str();
133}
134
136{
137 for(auto& elem : fThreadMap) {
138 auto* data_loop = dynamic_cast<TDataLoop*>(elem.second);
139 auto* chain_loop = dynamic_cast<TFragmentChainLoop*>(elem.second);
140 if(data_loop != nullptr || chain_loop != nullptr) {
141 std::cout << "Stopping thread " << elem.first << std::endl;
142 elem.second->Stop();
143 }
144 }
145}
146
148{
149 SendStop();
150
151 for(auto& elem : fThreadMap) {
152 std::cout << "Joining thread " << elem.first << std::endl;
153 StoppableThread* thread = elem.second;
154 thread->Join();
155 }
156
157 while(!fThreadMap.empty()) {
158 StoppableThread* thread = fThreadMap.begin()->second;
159 std::cout << "Deleting thread " << fThreadMap.begin()->first << std::endl;
160 delete thread;
161 }
162
163 status_out();
164}
165
167{
168 for(auto& elem : fThreadMap) {
169 elem.second->ClearQueue();
170 }
171}
172
173StoppableThread* StoppableThread::Get(const std::string& name)
174{
175 StoppableThread* mythread = nullptr;
176 if(fThreadMap.count(name) != 0u) {
177 mythread = fThreadMap.at(name);
178 }
179 return mythread;
180}
181
183{
184 if(fThreadMap.count(fName) != 0u) {
185 fThreadMap.erase(fName);
186 }
187 if(fThreadMap.empty()) {
188 fStatusThreadOn = false;
189 fStatusThread.join();
190 }
191}
192
194{
195 if(fRunning) {
196 std::unique_lock<std::mutex> lock(fPauseMutex);
197 fPaused = false;
198 fPausedWait.notify_one();
199 }
200}
201
203{
204 if(fRunning) {
205 fPaused = true;
206 }
207}
208
210{
211 std::unique_lock<std::mutex> lock(fPauseMutex);
212 fRunning = false;
213 std::cout << std::endl;
214 fPaused = false;
215 std::cout << EndStatus();
216 fPausedWait.notify_one();
217}
218
220{
221 return fRunning;
222}
223
225{
226 return fPaused;
227}
228
230{
231 if(fThread.joinable()) {
232 std::cout << EndStatus();
233 fThread.join();
234 }
235}
236
238{
239 while(fRunning) {
240 std::unique_lock<std::mutex> lock(fPauseMutex);
241 while(fPaused && fRunning) {
242 fPausedWait.wait_for(lock, std::chrono::milliseconds(100));
243 }
244 bool success = Iteration();
245 if(!success) {
246 fRunning = false;
247 std::cout << std::endl;
248 break;
249 }
250 }
251 OnEnd();
252}
253
255{
256 std::cout << "column width " << fColumnWidth << ", status width " << fStatusWidth << std::endl;
257 std::cout << GetNThreads() << " Threads:" << std::endl;
258 int counter = 0;
259 for(auto& thr : fThreadMap) {
260 std::cout << " " << counter << "\t" << thr.first << " @ " << hex(thr.second, 8) << std::endl;
261 counter++;
262 }
263}
264
272
274{
275 if(fStatusThreadOn) {
276 fStatusThreadOn = false;
277 }
278}
279
285
287{
288 while(fStatusThreadOn) {
289 std::this_thread::sleep_for(std::chrono::seconds(2));
290 status_out();
291 }
292 std::ofstream outfile(Form("%s/.grsi_thread", getenv("GRSISYS")));
293 outfile << "---------------------------------------------------------------\n";
294 outfile << "---------------------------------------------------------------\n";
295}
296
298{
299
300 std::ofstream outfile(Form("%s/.grsi_thread", getenv("GRSISYS")));
301 outfile << "---------------------------------------------------------------\n"; // 64 -.
302 for(auto& thr : fThreadMap) {
303 StoppableThread* thread = thr.second;
304 outfile << "- " << thread->Name() << (thread->IsRunning() ? "[Live]" : "[Stop]")
305 << std::string(64 - 8 - thread->Name().length(), ' ') << "-\n";
306 outfile << "- " << std::string(40, ' ') << "items_pushed: " << thread->GetItemsPushed() << "\n";
307 outfile << "- " << std::string(40, ' ') << "items_popped: " << thread->GetItemsPopped() << "\n";
308 outfile << "- " << std::string(40, ' ') << "items_current: " << thread->GetItemsCurrent() << "\n";
309 outfile << "- " << std::string(40, ' ') << "rate: " << thread->GetRate() << "\n";
310 outfile << "---------------------------------------------------------------\n"; // 64 -.
311 }
312 outfile << "---------------------------------------------------------------\n"; // 64 -.
313}
314
315std::vector<StoppableThread*> StoppableThread::GetAll()
316{
317 std::vector<StoppableThread*> output(fThreadMap.size(), nullptr);
318 int index = 0;
319 for(auto& elem : fThreadMap) {
320 output[index++] = elem.second;
321 }
322 return output;
323}
std::string hex(T val, int width=-1)
Definition Globals.h:129
virtual std::string Status()
virtual size_t GetRate()=0
static void start_status_thread()
static void StopAll()
static std::string AnyThreadStatus()
StoppableThread(std::string name)
static void ClearAllQueues()
virtual std::string EndStatus()
virtual size_t GetItemsPushed()=0
static std::vector< StoppableThread * > GetAll()
static std::map< std::string, StoppableThread * > fThreadMap
static void join_status_thread()
static std::string AllThreadStatus()
static size_t fColumnWidth
static bool fStatusThreadOn
static void status_out_loop()
virtual void OnEnd()
virtual std::string Progress()
static size_t fStatusWidth
std::condition_variable fPausedWait
static int GetNThreads()
static std::string AllThreadProgress()
std::mutex fPauseMutex
virtual size_t GetItemsPopped()=0
static std::string AllThreadHeader()
static StoppableThread * Get(const std::string &name)
std::thread fThread
static std::thread fStatusThread
std::atomic_bool fPaused
static void PauseAll()
static void stop_status_thread()
std::atomic_size_t fItemsPopped
number of items popped from input queue
static void Print()
static bool AnyThreadRunning()
std::string Name() const
static void SendStop()
static void ResumeAll()
static void status_out()
std::atomic_long fInputSize
std::atomic_bool fRunning
virtual size_t GetItemsCurrent()=0
virtual bool Iteration()=0