GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TUnpackingLoop.cxx
Go to the documentation of this file.
1#include "TUnpackingLoop.h"
2
3#include <chrono>
4#include <thread>
5#include <sstream>
6#include <memory>
7
8#include "TGRSIOptions.h"
9#include "TParserLibrary.h"
10
12{
13 if(name.length() == 0) {
14 name = "unpacking_loop";
15 }
16
17 auto* loop = static_cast<TUnpackingLoop*>(StoppableThread::Get(name));
18 if(loop == nullptr) {
19 loop = new TUnpackingLoop(name);
20 }
21 return loop;
22}
23
25 : StoppableThread(std::move(name)), fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<TRawEvent>>>()),
26 fFragsReadFromRaw(0), fGoodFragsRead(0)
27{
28 // try and open dynamic library
29 if(TGRSIOptions::Get()->ParserLibrary().empty()) {
30 throw std::runtime_error("No data parser library supplied, can't open parser!");
31 }
32
33 // create new data parser
35}
36
38
40{
41 std::shared_ptr<TRawEvent> singleEvent;
42 while(fInputQueue->Size() != 0u) {
43 fInputQueue->Pop(singleEvent);
44 }
45
47}
48
50{
51 std::shared_ptr<TRawEvent> event;
52 int error = fInputQueue->Pop(event);
53 if(error < 0) {
54 InputSize(0);
55 if(fInputQueue->IsFinished()) {
56 // Source is dead, push the last event and stop.
58 BadOutputQueue()->SetFinished();
59 ScalerOutputQueue()->SetFinished();
60 return false;
61 }
62 // Wait for the source to give more data.
63 std::this_thread::sleep_for(std::chrono::milliseconds(10));
64 return true;
65 }
67 InputSize(error); //"error" is the return value of popping an event from the input queue (which returns the number of events left)
69
71 fGoodFragsRead += event->GoodFrags();
72
73 return true;
74}
75
77{
78 std::ostringstream status;
79 if(fFragsReadFromRaw > 0) {
80 status << "\r" << Name() << ":\t" << fGoodFragsRead << " good fragments out of " << fFragsReadFromRaw
81 << " fragments => " << 100. * static_cast<double>(fGoodFragsRead) / static_cast<double>(fFragsReadFromRaw) << "% passed" << std::endl;
82 } else {
83 status << "\rno fragments read from midas => none parsed!" << std::endl;
84 }
85 status << fParser->OutputQueueStatus();
86 return status.str();
87}
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
std::string Name() const
void IncrementItemsPopped()
virtual std::string OutputQueueStatus()
virtual void SetFinished()
virtual int Process(std::shared_ptr< TRawEvent >)=0
virtual void ClearQueue()
virtual void SetStatusVariables(std::atomic_size_t *itemsPopped, std::atomic_long *inputSize)
Definition TDataParser.h:77
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
TDataParser * CreateDataParser()
RAW event.
Definition TRawEvent.h:23
static TParserLibrary * Get(bool verbose=false)
Definition TSingleton.h:33
static TUnpackingLoop * Get(std::string name="")
TUnpackingLoop(const TUnpackingLoop &)=delete
void ClearQueue() override
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TRawEvent > > > fInputQueue
TDataParser * fParser
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TEpicsFrag > > > & ScalerOutputQueue()
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TBadFragment > > > & BadOutputQueue()
int64_t fGoodFragsRead
int64_t fFragsReadFromRaw
std::string EndStatus() override
bool Iteration() override