GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TEventBuildingLoop.cxx
Go to the documentation of this file.
2
3#include "TGRSIOptions.h"
5
6#include <chrono>
7#include <thread>
8
9TEventBuildingLoop* TEventBuildingLoop::Get(std::string name, EBuildMode mode, uint64_t buildWindow)
10{
11 if(name.length() == 0) {
12 name = "build_loop";
13 }
14
15 auto* loop = static_cast<TEventBuildingLoop*>(StoppableThread::Get(name));
16 if(loop == nullptr) {
17 loop = new TEventBuildingLoop(name, mode, buildWindow);
18 }
19 return loop;
20}
21
22TEventBuildingLoop::TEventBuildingLoop(std::string name, EBuildMode mode, uint64_t buildWindow)
23 : StoppableThread(std::move(name)), fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>()),
24 fOutputQueue(std::make_shared<ThreadsafeQueue<std::vector<std::shared_ptr<const TFragment>>>>()),
25 fOutOfOrderQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>()), fBuildMode(mode),
26 fSortingDepth(10000), fBuildWindow(buildWindow), fPreviousSortingDepthError(false), fSkipInputSort(TGRSIOptions::Get()->SkipInputSort())
27{
28 std::cout << DYELLOW << (fSkipInputSort ? "Not sorting " : "Sorting ") << "input by time: ";
29 switch(fBuildMode) {
31 fOrdered = decltype(fOrdered)([](const std::shared_ptr<const TFragment>& a, const std::shared_ptr<const TFragment>& b) {
32 return a->GetTime() < b->GetTime();
33 });
34 std::cout << DYELLOW << "sorting by time, using build window of " << fBuildWindow << "!" << RESET_COLOR << std::endl;
35 break;
37 fOrdered = decltype(fOrdered)([](const std::shared_ptr<const TFragment>& a, const std::shared_ptr<const TFragment>& b) {
38 return a->GetTimeStampNs() < b->GetTimeStampNs();
39 });
40 std::cout << DYELLOW << "sorting by timestamp, using build window of " << fBuildWindow << "!" << RESET_COLOR << std::endl;
41 break;
43 fOrdered = decltype(fOrdered)([](const std::shared_ptr<const TFragment>& a, const std::shared_ptr<const TFragment>& b) {
44 return a->GetTriggerId() < b->GetTriggerId();
45 });
46 std::cout << DYELLOW << "sorting by trigger ID!" << RESET_COLOR << std::endl;
47 break;
49 // no need for ordering, always return true
50 fOrdered = decltype(fOrdered)([](const std::shared_ptr<const TFragment>&, const std::shared_ptr<const TFragment>&) {
51 return true;
52 });
53 std::cout << DYELLOW << "not sorting!" << RESET_COLOR << std::endl;
54 break;
56 std::cout << "build mode was " << static_cast<int>(fBuildMode) << ", not " << static_cast<int>(EBuildMode::kTimestamp) << ", or " << static_cast<int>(EBuildMode::kTriggerId) << std::endl;
57 throw std::runtime_error("Error in event building loop, no build mode selected. Maybe because no custom run info was loaded?");
58 break;
59 }
60}
61
63
65{
66 std::shared_ptr<const TFragment> single_event;
67 while(fInputQueue->Size() != 0u) {
68 fInputQueue->Pop(single_event);
69 }
70
71 std::vector<std::shared_ptr<const TFragment>> event;
72 while(fOutputQueue->Size() != 0u) {
73 fOutputQueue->Pop(event);
74 }
75}
76
78{
79 // Pull something off of the input queue.
80 std::shared_ptr<const TFragment> input_frag = nullptr;
81 InputSize(fInputQueue->Pop(input_frag, 0));
82 if(InputSize() < 0) {
83 InputSize(0);
84 }
85
86 if(input_frag != nullptr) {
88 if(!fSkipInputSort) {
89 fOrdered.insert(input_frag);
90 if(fOrdered.size() < fSortingDepth) {
91 // Got a new event, but we want to have more to sort
92 return true;
93 }
94 // Got a new event, and we have enough to sort.
95 }
96 } else {
97 if(!fInputQueue->IsFinished()) {
98 // If the parent is live, wait for it
99 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
100 return true;
101 }
102 if(fOrdered.empty()) {
103 // Parent is dead, and we have passed on all events
104 // check if last event needs to be pushed
105 if(!fNextEvent.empty()) {
107 }
108 fOutputQueue->SetFinished();
109 return false;
110 }
111 // Parent is dead, but we still have items.
112 // Continue through the function to process them.
113 }
114
115 // We have data, and we want to add it to the next fragment;
116 std::shared_ptr<const TFragment> next_fragment;
117 if(!fSkipInputSort) {
118 next_fragment = *fOrdered.begin();
119 fOrdered.erase(fOrdered.begin());
120 } else {
121 next_fragment = input_frag;
122 }
123
124 if(CheckBuildCondition(next_fragment)) {
125 fNextEvent.push_back(next_fragment);
126 }
127
128 return true;
129}
130
131bool TEventBuildingLoop::CheckBuildCondition(const std::shared_ptr<const TFragment>& frag)
132{
133 switch(fBuildMode) {
134 case EBuildMode::kTime: return CheckTimeCondition(frag); break;
135 case EBuildMode::kTimestamp: return CheckTimestampCondition(frag); break;
136 case EBuildMode::kTriggerId: return CheckTriggerIdCondition(frag); break;
138 // always push the current "event" (single fragment) on and clear it
140 fNextEvent.clear();
141 return true;
142 break;
143 default: return false;
144 }
145 return false; // we should never reach this statement!
146}
147
148bool TEventBuildingLoop::CheckTimeCondition(const std::shared_ptr<const TFragment>& frag)
149{
150 double time = frag->GetTime();
151 double event_start = (!fNextEvent.empty() ? (TGRSIOptions::AnalysisOptions()->StaticWindow() ? fNextEvent[0]->GetTime()
152 : fNextEvent.back()->GetTime())
153 : time);
154
155 // save time every <BuildWindow> fragments
156 if(frag->GetEntryNumber() % (TGRSIOptions::Get()->SortDepth()) == 0) {
157 TSortingDiagnostics::Get()->AddTime(event_start);
158 }
159 if(time > event_start + static_cast<double>(fBuildWindow) || time < event_start - static_cast<double>(fBuildWindow)) {
160 // std::cout.precision(12);
161 // std::cout<<std::setw(12)<<time<<", "<<std::setw(12)<<event_start<<", "<<std::setw(12)<<fBuildWindow<<"; "<<std::setw(12)<<fabs(time - event_start)<<", "<<std::setw(12)<<event_start + fBuildWindow<<", "<<std::setw(12)<<event_start - fBuildWindow<<std::endl;
163 fNextEvent.clear();
164 }
165
166 if(time < event_start) {
167 TSortingDiagnostics::Get()->OutOfTimeOrder(time, event_start, frag->GetEntryNumber());
169 std::cerr.precision(12);
170 std::cerr << std::endl
171 << "Sorting depth of " << fSortingDepth << " was insufficient. time: " << std::setw(12) << time
172 << " Last: " << std::setw(12) << event_start << " \n"
173 << "Not all events were built correctly" << std::endl;
174 std::cerr << "Please increase sort depth with --sort-depth=N" << std::endl;
176 }
177 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
178 fOutOfOrderQueue->Push(frag);
179 return false;
180 }
181 }
182
183 return true;
184}
185
186bool TEventBuildingLoop::CheckTimestampCondition(const std::shared_ptr<const TFragment>& frag)
187{
188 uint64_t timestamp = frag->GetTimeStampNs();
189 uint64_t event_start = (!fNextEvent.empty() ? (TGRSIOptions::AnalysisOptions()->StaticWindow() ? fNextEvent[0]->GetTimeStampNs()
190 : fNextEvent.back()->GetTimeStampNs())
191 : timestamp);
192
193 // save timestamp every <BuildWindow> fragments
194 if(frag->GetEntryNumber() % (TGRSIOptions::Get()->SortDepth()) == 0) {
196 }
197 if(timestamp > event_start + fBuildWindow || timestamp < event_start - fBuildWindow) {
199 fNextEvent.clear();
200 }
201
202 if(timestamp < event_start) {
203 TSortingDiagnostics::Get()->OutOfOrder(timestamp, event_start, frag->GetEntryNumber());
205 std::cerr << std::endl
206 << "Sorting depth of " << fSortingDepth << " was insufficient. timestamp: " << timestamp
207 << " Last: " << event_start << " \n"
208 << "Not all events were built correctly" << std::endl;
209 std::cerr << "Please increase sort depth with --sort-depth=N" << std::endl;
211 }
212 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
213 fOutOfOrderQueue->Push(frag);
214 return false;
215 }
216 }
217
218 return true;
219}
220
221bool TEventBuildingLoop::CheckTriggerIdCondition(const std::shared_ptr<const TFragment>& frag)
222{
223 int64_t trigger_id = frag->GetTriggerId();
224 int64_t current_trigger_id = (!fNextEvent.empty() ? fNextEvent[0]->GetTriggerId() : trigger_id);
225
226 // save trigger id every <BuildWindow> fragments
227 if(frag->GetEntryNumber() % (TGRSIOptions::Get()->SortDepth()) == 0) {
228 TSortingDiagnostics::Get()->AddTimeStamp(current_trigger_id);
229 }
230
231 if(trigger_id != current_trigger_id) {
233 fNextEvent.clear();
234 }
235
236 if(trigger_id < current_trigger_id) {
237 TSortingDiagnostics::Get()->OutOfOrder(trigger_id, current_trigger_id, frag->GetEntryNumber());
239 std::cerr << std::endl
240 << "Sorting depth of " << fSortingDepth << " was insufficient.\n"
241 << "Not all events were built correctly" << std::endl;
242 std::cerr << "Trigger id #" << trigger_id << " was incorrectly sorted before "
243 << "trigger id #" << current_trigger_id << std::endl;
244 std::cerr << "Please increase sort depth with --sort-depth=N" << std::endl;
246 }
247 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
248 fOutOfOrderQueue->Push(frag);
249 return false;
250 }
251 }
252
253 return true;
254}
255
257{
258 std::ostringstream str;
259 str << fInputQueue->Name() << ": " << ItemsPopped() << "/" << fInputQueue->ItemsPopped() << " items popped"
260 << std::endl;
261
262 return str.str();
263}
#define DYELLOW
Definition Globals.h:16
#define RESET_COLOR
Definition Globals.h:5
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
void IncrementItemsPopped()
TEventBuildingLoop(const TEventBuildingLoop &)=delete
std::string EndStatus() override
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fOutOfOrderQueue
std::shared_ptr< ThreadsafeQueue< std::vector< std::shared_ptr< const TFragment > > > > fOutputQueue
bool CheckTriggerIdCondition(const std::shared_ptr< const TFragment > &)
bool CheckTimestampCondition(const std::shared_ptr< const TFragment > &)
bool CheckTimeCondition(const std::shared_ptr< const TFragment > &)
std::vector< std::shared_ptr< const TFragment > > fNextEvent
bool CheckBuildCondition(const std::shared_ptr< const TFragment > &)
static TEventBuildingLoop * Get(std::string name="", EBuildMode mode=EBuildMode::kTimestamp, uint64_t buildWindow=2000)
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fInputQueue
std::multiset< std::shared_ptr< const TFragment >, std::function< bool(std::shared_ptr< const TFragment >, std::shared_ptr< const TFragment >)> > fOrdered
void ClearQueue() override
bool Iteration() override
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static TAnalysisOptions * AnalysisOptions()
int SortDepth() const
static TSortingDiagnostics * Get(bool verbose=false)
Definition TSingleton.h:33
void OutOfTimeOrder(double newFragTime, double oldFragTime, int64_t newEntry)
void AddTime(double val)
void AddTimeStamp(Long_t val)
void OutOfOrder(int64_t newFragTS, int64_t oldFragTS, int64_t newEntry)