26 fSortingDepth(10000), fBuildWindow(buildWindow), fPreviousSortingDepthError(false), fSkipInputSort(
TGRSIOptions::Get()->SkipInputSort())
31 fOrdered =
decltype(
fOrdered)([](
const std::shared_ptr<const TFragment>& a,
const std::shared_ptr<const TFragment>& b) {
32 return a->GetTime() < b->GetTime();
37 fOrdered =
decltype(
fOrdered)([](
const std::shared_ptr<const TFragment>& a,
const std::shared_ptr<const TFragment>& b) {
38 return a->GetTimeStampNs() < b->GetTimeStampNs();
43 fOrdered =
decltype(
fOrdered)([](
const std::shared_ptr<const TFragment>& a,
const std::shared_ptr<const TFragment>& b) {
44 return a->GetTriggerId() < b->GetTriggerId();
50 fOrdered =
decltype(
fOrdered)([](
const std::shared_ptr<const TFragment>&,
const std::shared_ptr<const TFragment>&) {
57 throw std::runtime_error(
"Error in event building loop, no build mode selected. Maybe because no custom run info was loaded?");
66 std::shared_ptr<const TFragment> singleEvent;
71 std::vector<std::shared_ptr<const TFragment>> event;
80 std::shared_ptr<const TFragment> inputFragment =
nullptr;
86 if(inputFragment !=
nullptr) {
99 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
116 std::shared_ptr<const TFragment> nextFragment;
121 nextFragment = inputFragment;
143 default:
return false;
150 double time = frag->GetTime();
159 if(time > eventStart +
static_cast<double>(
fBuildWindow) || time < eventStart -
static_cast<double>(
fBuildWindow)) {
164 if(time < eventStart) {
167 std::cerr.precision(12);
168 std::cerr << std::endl
169 <<
"Sorting depth of " <<
fSortingDepth <<
" was insufficient. time: " << std::setw(16) << time
170 <<
" Last: " << std::setw(16) << eventStart <<
" \n"
171 <<
"Not all events were built correctly" << std::endl;
172 std::cerr <<
"Please increase sort depth with --sort-depth=N, if needed" << std::endl;
186 uint64_t timestamp = frag->GetTimeStampNs();
200 if(timestamp < eventStart) {
203 std::cerr << std::endl
204 <<
"Sorting depth of " <<
fSortingDepth <<
" was insufficient. timestamp: " << std::setw(16) << timestamp
205 <<
" Last: " << std::setw(16) << eventStart <<
" \n"
206 <<
"Not all events were built correctly" << std::endl;
207 std::cerr <<
"Please increase sort depth with --sort-depth=N, if needed" << std::endl;
221 int64_t triggerId = frag->GetTriggerId();
229 if(triggerId != currentTriggerId) {
234 if(triggerId < currentTriggerId) {
237 std::cerr << std::endl
238 <<
"Sorting depth of " <<
fSortingDepth <<
" was insufficient.\n"
239 <<
"Not all events were built correctly" << std::endl;
240 std::cerr <<
"Trigger id #" << triggerId <<
" was incorrectly sorted before "
241 <<
"trigger id #" << currentTriggerId << std::endl;
242 std::cerr <<
"Please increase sort depth with --sort-depth=N, if needed" << std::endl;
256 std::ostringstream str;
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
void IncrementItemsPopped()
TEventBuildingLoop(const TEventBuildingLoop &)=delete
std::string EndStatus() override
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fOutOfOrderQueue
std::shared_ptr< ThreadsafeQueue< std::vector< std::shared_ptr< const TFragment > > > > fOutputQueue
bool CheckTriggerIdCondition(const std::shared_ptr< const TFragment > &)
unsigned int fSortingDepth
bool CheckTimestampCondition(const std::shared_ptr< const TFragment > &)
bool CheckTimeCondition(const std::shared_ptr< const TFragment > &)
std::vector< std::shared_ptr< const TFragment > > fNextEvent
bool CheckBuildCondition(const std::shared_ptr< const TFragment > &)
static TEventBuildingLoop * Get(std::string name="", EBuildMode mode=EBuildMode::kTimestamp, uint64_t buildWindow=2000)
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fInputQueue
std::multiset< std::shared_ptr< const TFragment >, std::function< bool(std::shared_ptr< const TFragment >, std::shared_ptr< const TFragment >)> > fOrdered
void ClearQueue() override
bool fPreviousSortingDepthError
bool Iteration() override
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static TAnalysisOptions * AnalysisOptions()
static TSortingDiagnostics * Get(bool verbose=false)
void OutOfTimeOrder(double newFragTime, double oldFragTime, int64_t newEntry)
void AddTimeStamp(Long_t val)
void OutOfOrder(int64_t newFragTS, int64_t oldFragTS, int64_t newEntry)