GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TDataLoop.cxx
Go to the documentation of this file.
1#include "TDataLoop.h"
2
3#include <chrono>
4#include <thread>
5#include <utility>
6#include <cstdio>
7#include <sstream>
8
9#include "TGRSIOptions.h"
10#include "TString.h"
11#include "TRawFile.h"
12#include "TChannel.h"
13#include "TRunInfo.h"
14
15TDataLoop::TDataLoop(std::string name, TRawFile* source)
16 : StoppableThread(std::move(name)), fSource(source), fSelfStopping(true), fEventsRead(0),
17 fOutputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<TRawEvent>>>("midas_queue"))
18{
19}
20
21TDataLoop* TDataLoop::Get(std::string name, TRawFile* source)
22{
23 if(name.length() == 0) {
24 name = "input_loop";
25 }
26 auto* loop = static_cast<TDataLoop*>(StoppableThread::Get(name));
27 if((loop == nullptr) && (source != nullptr)) {
28 loop = new TDataLoop(name, source);
29 }
30 return loop;
31}
32
34{
35 std::shared_ptr<TRawEvent> event;
36 while(fOutputQueue->Size() != 0u) {
37 fOutputQueue->Pop(event);
38 }
39}
40
42{
43 std::lock_guard<std::mutex> lock(fSourceMutex);
44 // delete source;
45 fSource = new_source;
46}
47
49{
50 fOutputQueue->SetFinished();
51}
52
54{
55 std::shared_ptr<TRawEvent> evt = fSource->NewEvent();
56 int bytesRead = 0;
57 {
58 std::lock_guard<std::mutex> lock(fSourceMutex);
59 bytesRead = fSource->Read(evt);
60 ItemsPopped(fSource->BytesRead() / 1000); // should this be / 1024 ?
61 InputSize(fSource->FileSize() / 1000 - ItemsPopped()); // this way fInputSize+fItemsPopped give the file size
63 if(TGRSIOptions::Get()->Downscaling() > 1) {
64 // if we use downscaling we skip n-1 events without updating bytesRead
65 // that way all further checks work as usual on the single event we read
66 fSource->Skip(TGRSIOptions::Get()->Downscaling() - 1);
67 ItemsPopped(fSource->BytesRead() / 1000);
68 InputSize(fSource->FileSize() / 1000 - ItemsPopped()); // this way fInputSize+fItemsPopped give the file size
70 }
71 }
72
73 if(bytesRead <= 0 && fSelfStopping) {
74 // Error, and no point in trying again.
75 return false;
76 }
77 if(bytesRead > 0) {
78 // A good event was returned
79 fOutputQueue->Push(evt);
80 return (fEventsRead != TGRSIOptions::Get()->NumberOfEvents()); // false if fEventsRead == number of events, true otherwise
81 }
82 // Nothing returned this time, but I might get something next time.
83 std::this_thread::sleep_for(std::chrono::milliseconds(500));
84 return true;
85}
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
TRawFile * fSource
Definition TDataLoop.h:72
void ReplaceSource(TRawFile *new_source)
Definition TDataLoop.cxx:41
bool fSelfStopping
Definition TDataLoop.h:73
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TRawEvent > > > fOutputQueue
Definition TDataLoop.h:77
static TDataLoop * Get(std::string name="", TRawFile *source=nullptr)
Definition TDataLoop.cxx:21
void OnEnd() override
Definition TDataLoop.cxx:48
size_t fEventsRead
Definition TDataLoop.h:74
void ClearQueue() override
Definition TDataLoop.cxx:33
std::mutex fSourceMutex
Definition TDataLoop.h:78
bool Iteration() override
Definition TDataLoop.cxx:53
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
int Downscaling() const
RAW event.
Definition TRawEvent.h:23
Reader for raw files.
Definition TRawFile.h:31
virtual int Read(std::shared_ptr< TRawEvent > event)=0
Read one event from the file.
virtual void Skip(size_t nofEvents)=0
Skip nofEvents events in file.
virtual size_t FileSize()
Definition TRawFile.h:63
virtual size_t BytesRead()
Definition TRawFile.h:61
virtual std::shared_ptr< TRawEvent > NewEvent()=0