GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TUnpackingLoop.cxx
Go to the documentation of this file.
1#include "TUnpackingLoop.h"
2
3#include <chrono>
4#include <thread>
5#include <sstream>
6#include <memory>
7
8#include "TGRSIOptions.h"
9#include "TParserLibrary.h"
10
12{
13 if(name.empty()) {
14 name = "unpacking_loop";
15 }
16
17 auto* loop = static_cast<TUnpackingLoop*>(StoppableThread::Get(name));
18 if(loop == nullptr) {
19 loop = new TUnpackingLoop(name);
20 }
21 return loop;
22}
23
25 : StoppableThread(std::move(name)), fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<TRawEvent>>>()),
26 fFragsReadFromRaw(0), fGoodFragsRead(0)
27{
28 // try and open dynamic library
29 if(TGRSIOptions::Get()->ParserLibrary().empty()) {
30 throw std::runtime_error("No data parser library supplied, can't open parser!");
31 }
32
33 // create new data parser
35}
36
41
43{
44 std::shared_ptr<TRawEvent> singleEvent;
45 while(fInputQueue->Size() != 0u) {
46 fInputQueue->Pop(singleEvent);
47 }
48
50}
51
53{
54 std::shared_ptr<TRawEvent> event;
55 int error = fInputQueue->Pop(event);
56 if(error < 0) {
57 InputSize(0);
58 if(fInputQueue->IsFinished()) {
59 // Source is dead, push the last event and stop.
61 BadOutputQueue()->SetFinished();
62 ScalerOutputQueue()->SetFinished();
63 return false;
64 }
65 // Wait for the source to give more data.
66 std::this_thread::sleep_for(std::chrono::milliseconds(10));
67 return true;
68 }
70 InputSize(error); //"error" is the return value of popping an event from the input queue (which returns the number of events left)
72
74 fGoodFragsRead += event->GoodFrags();
75
76 return true;
77}
78
80{
81 std::ostringstream status;
82 if(fFragsReadFromRaw > 0) {
83 status << "\r" << Name() << ":\t" << fGoodFragsRead << " good fragments out of " << fFragsReadFromRaw
84 << " fragments => " << 100. * static_cast<double>(fGoodFragsRead) / static_cast<double>(fFragsReadFromRaw) << "% passed" << std::endl;
85 } else {
86 status << "\rno fragments read from midas => none parsed!" << std::endl;
87 }
88 status << fParser->OutputQueueStatus();
89 return status.str();
90}
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
std::string Name() const
void IncrementItemsPopped()
virtual std::string OutputQueueStatus()
virtual void SetFinished()
virtual int Process(std::shared_ptr< TRawEvent >)=0
virtual void ClearQueue()
virtual void SetStatusVariables(std::atomic_size_t *itemsPopped, std::atomic_long *inputSize)
Definition TDataParser.h:75
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
TDataParser * CreateDataParser()
RAW event.
Definition TRawEvent.h:22
static TParserLibrary * Get(bool verbose=false)
Definition TSingleton.h:33
static TUnpackingLoop * Get(std::string name="")
TUnpackingLoop(const TUnpackingLoop &)=delete
void ClearQueue() override
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TRawEvent > > > fInputQueue
TDataParser * fParser
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TEpicsFrag > > > & ScalerOutputQueue()
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TBadFragment > > > & BadOutputQueue()
int64_t fGoodFragsRead
int64_t fFragsReadFromRaw
std::string EndStatus() override
bool Iteration() override