GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TEventBuildingLoop.cxx
Go to the documentation of this file.
2
3#include "TGRSIOptions.h"
5
6#include <chrono>
7#include <thread>
8
9TEventBuildingLoop* TEventBuildingLoop::Get(std::string name, EBuildMode mode, uint64_t buildWindow)
10{
11 if(name.empty()) {
12 name = "build_loop";
13 }
14
15 auto* loop = static_cast<TEventBuildingLoop*>(StoppableThread::Get(name));
16 if(loop == nullptr) {
17 loop = new TEventBuildingLoop(name, mode, buildWindow);
18 }
19 return loop;
20}
21
22TEventBuildingLoop::TEventBuildingLoop(std::string name, EBuildMode mode, uint64_t buildWindow)
23 : StoppableThread(std::move(name)), fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>()),
24 fOutputQueue(std::make_shared<ThreadsafeQueue<std::vector<std::shared_ptr<const TFragment>>>>()),
25 fOutOfOrderQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>()), fBuildMode(mode),
26 fSortingDepth(10000), fBuildWindow(buildWindow), fPreviousSortingDepthError(false), fSkipInputSort(TGRSIOptions::Get()->SkipInputSort())
27{
28 std::cout << DYELLOW << (fSkipInputSort ? "Not sorting " : "Sorting ") << "input by time: ";
29 switch(fBuildMode) {
31 fOrdered = decltype(fOrdered)([](const std::shared_ptr<const TFragment>& a, const std::shared_ptr<const TFragment>& b) {
32 return a->GetTime() < b->GetTime();
33 });
34 std::cout << DYELLOW << "sorting by time, using build window of " << fBuildWindow << "!" << RESET_COLOR << std::endl;
35 break;
37 fOrdered = decltype(fOrdered)([](const std::shared_ptr<const TFragment>& a, const std::shared_ptr<const TFragment>& b) {
38 return a->GetTimeStampNs() < b->GetTimeStampNs();
39 });
40 std::cout << DYELLOW << "sorting by timestamp, using build window of " << fBuildWindow << "!" << RESET_COLOR << std::endl;
41 break;
43 fOrdered = decltype(fOrdered)([](const std::shared_ptr<const TFragment>& a, const std::shared_ptr<const TFragment>& b) {
44 return a->GetTriggerId() < b->GetTriggerId();
45 });
46 std::cout << DYELLOW << "sorting by trigger ID!" << RESET_COLOR << std::endl;
47 break;
49 // no need for ordering, always return true
50 fOrdered = decltype(fOrdered)([](const std::shared_ptr<const TFragment>&, const std::shared_ptr<const TFragment>&) {
51 return true;
52 });
53 std::cout << DYELLOW << "not sorting!" << RESET_COLOR << std::endl;
54 break;
56 std::cout << "build mode was " << static_cast<int>(fBuildMode) << ", not " << static_cast<int>(EBuildMode::kTimestamp) << ", or " << static_cast<int>(EBuildMode::kTriggerId) << std::endl;
57 throw std::runtime_error("Error in event building loop, no build mode selected. Maybe because no custom run info was loaded?");
58 break;
59 }
60}
61
63
65{
66 std::shared_ptr<const TFragment> singleEvent;
67 while(fInputQueue->Size() != 0u) {
68 fInputQueue->Pop(singleEvent);
69 }
70
71 std::vector<std::shared_ptr<const TFragment>> event;
72 while(fOutputQueue->Size() != 0u) {
73 fOutputQueue->Pop(event);
74 }
75}
76
78{
79 // Pull something off of the input queue.
80 std::shared_ptr<const TFragment> inputFragment = nullptr;
81 InputSize(fInputQueue->Pop(inputFragment, 0));
82 if(InputSize() < 0) {
83 InputSize(0);
84 }
85
86 if(inputFragment != nullptr) {
88 if(!fSkipInputSort) {
89 fOrdered.insert(inputFragment);
90 if(fOrdered.size() < fSortingDepth) {
91 // Got a new event, but we want to have more to sort
92 return true;
93 }
94 // Got a new event, and we have enough to sort.
95 }
96 } else {
97 if(!fInputQueue->IsFinished()) {
98 // If the parent is live, wait for it
99 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
100 return true;
101 }
102 if(fOrdered.empty()) {
103 // Parent is dead, and we have passed on all events
104 // check if last event needs to be pushed
105 if(!fNextEvent.empty()) {
107 }
108 fOutputQueue->SetFinished();
109 return false;
110 }
111 // Parent is dead, but we still have items.
112 // Continue through the function to process them.
113 }
114
115 // We have data, and we want to add it to the next fragment;
116 std::shared_ptr<const TFragment> nextFragment;
117 if(!fSkipInputSort) {
118 nextFragment = *fOrdered.begin();
119 fOrdered.erase(fOrdered.begin());
120 } else {
121 nextFragment = inputFragment;
122 }
123
124 if(CheckBuildCondition(nextFragment)) {
125 fNextEvent.push_back(nextFragment);
126 }
127
128 return true;
129}
130
131bool TEventBuildingLoop::CheckBuildCondition(const std::shared_ptr<const TFragment>& frag)
132{
133 switch(fBuildMode) {
134 case EBuildMode::kTime: return CheckTimeCondition(frag); break;
135 case EBuildMode::kTimestamp: return CheckTimestampCondition(frag); break;
136 case EBuildMode::kTriggerId: return CheckTriggerIdCondition(frag); break;
138 // always push the current "event" (single fragment) on and clear it
140 fNextEvent.clear();
141 return true;
142 break;
143 default: return false;
144 }
145 return false; // we should never reach this statement!
146}
147
148bool TEventBuildingLoop::CheckTimeCondition(const std::shared_ptr<const TFragment>& frag)
149{
150 double time = frag->GetTime();
151 double eventStart = (!fNextEvent.empty() ? (TGRSIOptions::AnalysisOptions()->StaticWindow() ? fNextEvent[0]->GetTime()
152 : fNextEvent.back()->GetTime())
153 : time);
154
155 // save time every <BuildWindow> fragments
156 if(frag->GetEntryNumber() % (TGRSIOptions::Get()->SortDepth()) == 0) {
157 TSortingDiagnostics::Get()->AddTime(eventStart);
158 }
159 if(time > eventStart + static_cast<double>(fBuildWindow) || time < eventStart - static_cast<double>(fBuildWindow)) {
161 fNextEvent.clear();
162 }
163
164 if(time < eventStart) {
165 TSortingDiagnostics::Get()->OutOfTimeOrder(time, eventStart, frag->GetEntryNumber());
167 std::cerr.precision(12);
168 std::cerr << std::endl
169 << "Sorting depth of " << fSortingDepth << " was insufficient. time: " << std::setw(16) << time
170 << " Last: " << std::setw(16) << eventStart << " \n"
171 << "Not all events were built correctly" << std::endl;
172 std::cerr << "Please increase sort depth with --sort-depth=N, if needed" << std::endl;
174 }
175 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
176 fOutOfOrderQueue->Push(frag);
177 return false;
178 }
179 }
180
181 return true;
182}
183
184bool TEventBuildingLoop::CheckTimestampCondition(const std::shared_ptr<const TFragment>& frag)
185{
186 uint64_t timestamp = frag->GetTimeStampNs();
187 uint64_t eventStart = (!fNextEvent.empty() ? (TGRSIOptions::AnalysisOptions()->StaticWindow() ? fNextEvent[0]->GetTimeStampNs()
188 : fNextEvent.back()->GetTimeStampNs())
189 : timestamp);
190
191 // save timestamp every <BuildWindow> fragments
192 if(frag->GetEntryNumber() % (TGRSIOptions::Get()->SortDepth()) == 0) {
194 }
195 if(timestamp > eventStart + fBuildWindow || timestamp < eventStart - fBuildWindow) {
197 fNextEvent.clear();
198 }
199
200 if(timestamp < eventStart) {
201 TSortingDiagnostics::Get()->OutOfOrder(timestamp, eventStart, frag->GetEntryNumber());
203 std::cerr << std::endl
204 << "Sorting depth of " << fSortingDepth << " was insufficient. timestamp: " << std::setw(16) << timestamp
205 << " Last: " << std::setw(16) << eventStart << " \n"
206 << "Not all events were built correctly" << std::endl;
207 std::cerr << "Please increase sort depth with --sort-depth=N, if needed" << std::endl;
209 }
210 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
211 fOutOfOrderQueue->Push(frag);
212 return false;
213 }
214 }
215
216 return true;
217}
218
219bool TEventBuildingLoop::CheckTriggerIdCondition(const std::shared_ptr<const TFragment>& frag)
220{
221 int64_t triggerId = frag->GetTriggerId();
222 int64_t currentTriggerId = (!fNextEvent.empty() ? fNextEvent[0]->GetTriggerId() : triggerId);
223
224 // save trigger id every <BuildWindow> fragments
225 if(frag->GetEntryNumber() % (TGRSIOptions::Get()->SortDepth()) == 0) {
226 TSortingDiagnostics::Get()->AddTimeStamp(currentTriggerId);
227 }
228
229 if(triggerId != currentTriggerId) {
231 fNextEvent.clear();
232 }
233
234 if(triggerId < currentTriggerId) {
235 TSortingDiagnostics::Get()->OutOfOrder(triggerId, currentTriggerId, frag->GetEntryNumber());
237 std::cerr << std::endl
238 << "Sorting depth of " << fSortingDepth << " was insufficient.\n"
239 << "Not all events were built correctly" << std::endl;
240 std::cerr << "Trigger id #" << triggerId << " was incorrectly sorted before "
241 << "trigger id #" << currentTriggerId << std::endl;
242 std::cerr << "Please increase sort depth with --sort-depth=N, if needed" << std::endl;
244 }
245 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
246 fOutOfOrderQueue->Push(frag);
247 return false;
248 }
249 }
250
251 return true;
252}
253
255{
256 std::ostringstream str;
257 str << fInputQueue->Name() << ": " << ItemsPopped() << "/" << fInputQueue->ItemsPopped() << " items popped"
258 << std::endl;
259
260 return str.str();
261}
#define DYELLOW
Definition Globals.h:16
#define RESET_COLOR
Definition Globals.h:5
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
void IncrementItemsPopped()
TEventBuildingLoop(const TEventBuildingLoop &)=delete
std::string EndStatus() override
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fOutOfOrderQueue
std::shared_ptr< ThreadsafeQueue< std::vector< std::shared_ptr< const TFragment > > > > fOutputQueue
bool CheckTriggerIdCondition(const std::shared_ptr< const TFragment > &)
bool CheckTimestampCondition(const std::shared_ptr< const TFragment > &)
bool CheckTimeCondition(const std::shared_ptr< const TFragment > &)
std::vector< std::shared_ptr< const TFragment > > fNextEvent
bool CheckBuildCondition(const std::shared_ptr< const TFragment > &)
static TEventBuildingLoop * Get(std::string name="", EBuildMode mode=EBuildMode::kTimestamp, uint64_t buildWindow=2000)
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fInputQueue
std::multiset< std::shared_ptr< const TFragment >, std::function< bool(std::shared_ptr< const TFragment >, std::shared_ptr< const TFragment >)> > fOrdered
void ClearQueue() override
bool Iteration() override
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static TAnalysisOptions * AnalysisOptions()
int SortDepth() const
static TSortingDiagnostics * Get(bool verbose=false)
Definition TSingleton.h:33
void OutOfTimeOrder(double newFragTime, double oldFragTime, int64_t newEntry)
void AddTime(double val)
void AddTimeStamp(Long_t val)
void OutOfOrder(int64_t newFragTS, int64_t oldFragTS, int64_t newEntry)