11 if(name.length() == 0) {
26 fSortingDepth(10000), fBuildWindow(buildWindow), fPreviousSortingDepthError(false), fSkipInputSort(
TGRSIOptions::Get()->SkipInputSort())
31 fOrdered =
decltype(
fOrdered)([](
const std::shared_ptr<const TFragment>& a,
const std::shared_ptr<const TFragment>& b) {
32 return a->GetTime() < b->GetTime();
37 fOrdered =
decltype(
fOrdered)([](
const std::shared_ptr<const TFragment>& a,
const std::shared_ptr<const TFragment>& b) {
38 return a->GetTimeStampNs() < b->GetTimeStampNs();
43 fOrdered =
decltype(
fOrdered)([](
const std::shared_ptr<const TFragment>& a,
const std::shared_ptr<const TFragment>& b) {
44 return a->GetTriggerId() < b->GetTriggerId();
50 fOrdered =
decltype(
fOrdered)([](
const std::shared_ptr<const TFragment>&,
const std::shared_ptr<const TFragment>&) {
57 throw std::runtime_error(
"Error in event building loop, no build mode selected. Maybe because no custom run info was loaded?");
66 std::shared_ptr<const TFragment> single_event;
71 std::vector<std::shared_ptr<const TFragment>> event;
80 std::shared_ptr<const TFragment> input_frag =
nullptr;
86 if(input_frag !=
nullptr) {
99 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
116 std::shared_ptr<const TFragment> next_fragment;
121 next_fragment = input_frag;
143 default:
return false;
150 double time = frag->GetTime();
159 if(time > event_start +
static_cast<double>(
fBuildWindow) || time < event_start -
static_cast<double>(
fBuildWindow)) {
166 if(time < event_start) {
169 std::cerr.precision(12);
170 std::cerr << std::endl
171 <<
"Sorting depth of " <<
fSortingDepth <<
" was insufficient. time: " << std::setw(12) << time
172 <<
" Last: " << std::setw(12) << event_start <<
" \n"
173 <<
"Not all events were built correctly" << std::endl;
174 std::cerr <<
"Please increase sort depth with --sort-depth=N" << std::endl;
188 uint64_t timestamp = frag->GetTimeStampNs();
202 if(timestamp < event_start) {
205 std::cerr << std::endl
206 <<
"Sorting depth of " <<
fSortingDepth <<
" was insufficient. timestamp: " << timestamp
207 <<
" Last: " << event_start <<
" \n"
208 <<
"Not all events were built correctly" << std::endl;
209 std::cerr <<
"Please increase sort depth with --sort-depth=N" << std::endl;
223 int64_t trigger_id = frag->GetTriggerId();
224 int64_t current_trigger_id = (!
fNextEvent.empty() ?
fNextEvent[0]->GetTriggerId() : trigger_id);
231 if(trigger_id != current_trigger_id) {
236 if(trigger_id < current_trigger_id) {
239 std::cerr << std::endl
240 <<
"Sorting depth of " <<
fSortingDepth <<
" was insufficient.\n"
241 <<
"Not all events were built correctly" << std::endl;
242 std::cerr <<
"Trigger id #" << trigger_id <<
" was incorrectly sorted before "
243 <<
"trigger id #" << current_trigger_id << std::endl;
244 std::cerr <<
"Please increase sort depth with --sort-depth=N" << std::endl;
258 std::ostringstream str;
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
void IncrementItemsPopped()
TEventBuildingLoop(const TEventBuildingLoop &)=delete
std::string EndStatus() override
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fOutOfOrderQueue
std::shared_ptr< ThreadsafeQueue< std::vector< std::shared_ptr< const TFragment > > > > fOutputQueue
bool CheckTriggerIdCondition(const std::shared_ptr< const TFragment > &)
unsigned int fSortingDepth
bool CheckTimestampCondition(const std::shared_ptr< const TFragment > &)
bool CheckTimeCondition(const std::shared_ptr< const TFragment > &)
std::vector< std::shared_ptr< const TFragment > > fNextEvent
bool CheckBuildCondition(const std::shared_ptr< const TFragment > &)
static TEventBuildingLoop * Get(std::string name="", EBuildMode mode=EBuildMode::kTimestamp, uint64_t buildWindow=2000)
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fInputQueue
std::multiset< std::shared_ptr< const TFragment >, std::function< bool(std::shared_ptr< const TFragment >, std::shared_ptr< const TFragment >)> > fOrdered
void ClearQueue() override
bool fPreviousSortingDepthError
bool Iteration() override
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static TAnalysisOptions * AnalysisOptions()
static TSortingDiagnostics * Get(bool verbose=false)
void OutOfTimeOrder(double newFragTime, double oldFragTime, int64_t newEntry)
void AddTimeStamp(Long_t val)
void OutOfOrder(int64_t newFragTS, int64_t oldFragTS, int64_t newEntry)