GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TFragWriteLoop.cxx
Go to the documentation of this file.
1#include "TFragWriteLoop.h"
2
3#include <sstream>
4#include <iomanip>
5#include <chrono>
6#include <thread>
7
8#include "TFile.h"
9#include "TThread.h"
10#include "TROOT.h"
11
12#include "GValue.h"
13#include "TChannel.h"
14#include "TRunInfo.h"
15#include "TGRSIOptions.h"
16#include "TTreeFillMutex.h"
17#include "TParsingDiagnostics.h"
18
19#include "TBadFragment.h"
20#include "TScalerQueue.h"
21
22TFragWriteLoop* TFragWriteLoop::Get(std::string name, std::string fOutputFilename)
23{
24 if(name.length() == 0) {
25 name = "write_loop";
26 }
27
28 auto* loop = static_cast<TFragWriteLoop*>(StoppableThread::Get(name));
29 if(loop == nullptr) {
30 if(fOutputFilename.length() == 0) {
31 fOutputFilename = "temp.root";
32 }
33 loop = new TFragWriteLoop(name, fOutputFilename);
34 }
35 return loop;
36}
37
38TFragWriteLoop::TFragWriteLoop(std::string name, const std::string& fOutputFilename)
39 : StoppableThread(std::move(name)), fOutputFile(nullptr), fEventTree(nullptr), fBadEventTree(nullptr), fScalerTree(nullptr),
40 fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>()),
41 fBadInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TBadFragment>>>()),
42 fScalerInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<TEpicsFrag>>>())
43{
44 if(fOutputFilename != "/dev/null") {
45 TThread::Lock();
46
47 fOutputFile = new TFile(fOutputFilename.c_str(), "RECREATE");
48 if(fOutputFile == nullptr || !fOutputFile->IsOpen()) {
49 throw std::runtime_error(Form("Failed to open \"%s\"\n", fOutputFilename.c_str()));
50 }
51
52 fEventTree = new TTree("FragmentTree", "FragmentTree");
54 fEventTree->Branch("TFragment", &fEventAddress);
55
56 fBadEventTree = new TTree("BadFragmentTree", "BadFragmentTree");
58 fBadEventTree->Branch("TBadFragment", &fBadEventAddress);
59
60 fScalerTree = new TTree("EpicsTree", "EpicsTree");
61 fScalerAddress = nullptr;
62 fScalerTree->Branch("TEpicsFrag", &fScalerAddress);
63
64 TThread::UnLock();
65 }
66}
67
72
74{
75 while(fInputQueue->Size() != 0u) {
76 std::shared_ptr<const TFragment> event;
77 fInputQueue->Pop(event);
78 }
79}
80
82{
83 std::ostringstream str;
84 str << std::endl
85 << Name() << ": " << std::setw(8) << ItemsPopped() << "/" << ItemsPopped() + InputSize() << ", "
86 << fEventTree->GetEntries() << " good fragments, " << fBadEventTree->GetEntries() << " bad fragments"
87 << std::endl;
88 return str.str();
89}
90
92{
93 std::shared_ptr<const TFragment> event;
94 InputSize(fInputQueue->Pop(event, 0));
95 if(InputSize() < 0) {
96 InputSize(0);
97 }
98
99 std::shared_ptr<const TBadFragment> badEvent;
100 fBadInputQueue->Pop(badEvent, 0);
101
102 std::shared_ptr<TEpicsFrag> scaler;
103 fScalerInputQueue->Pop(scaler, 0);
104
105 bool hasAnything = event || badEvent || scaler;
106 bool allParentsDead = (fInputQueue->IsFinished() && fBadInputQueue->IsFinished() && fScalerInputQueue->IsFinished());
107
108 if(event != nullptr) {
109 WriteEvent(event);
111 }
112
113 if(badEvent != nullptr) {
114 WriteBadEvent(badEvent);
115 }
116
117 if(scaler != nullptr) {
118 WriteScaler(scaler);
119 }
120
121 if(hasAnything) {
122 return true;
123 }
124 if(allParentsDead) {
125 return false;
126 }
127 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
128 return true;
129}
130
132{
133 if(fOutputFile != nullptr) {
134 // get all singletons before switching to the output file
135 gROOT->cd();
136 TGRSIOptions* options = TGRSIOptions::Get();
137 TPPG* ppg = TPPG::Get();
138 TParsingDiagnostics* parsingDiagnostics = TParsingDiagnostics::Get();
139 GValue* gValues = GValue::Get();
140
141 fOutputFile->cd();
142 fEventTree->Write(fEventTree->GetName(), TObject::kOverwrite);
143 fBadEventTree->Write(fBadEventTree->GetName(), TObject::kOverwrite);
144 fScalerTree->Write(fScalerTree->GetName(), TObject::kOverwrite);
145 if(GValue::Size() != 0) {
146 gValues->Write("Values", TObject::kOverwrite);
147 }
148
151 }
152
155 ppg->Write("PPG");
156
157 if(options->WriteDiagnostics()) {
158 parsingDiagnostics->ReadPPG(ppg); // this set's the cycle length from the PPG information
159 parsingDiagnostics->Write("ParsingDiagnostics", TObject::kOverwrite);
160 }
161
162 if(!options->IgnoreScaler()) {
163 std::cout << "Starting to write dead time scalers" << std::endl;
164 auto* deadtimeQueue = TDeadtimeScalerQueue::Get();
165 auto* scalerTree = new TTree("DeadtimeScaler", "DeadtimeScaler");
166 auto* scalerData = new TScalerData;
167 scalerTree->Branch("ScalerData", &scalerData);
168 while(deadtimeQueue->Size() > 0) {
169 scalerData = deadtimeQueue->PopScaler();
170 scalerTree->Fill();
171 }
172 scalerTree->Write();
173
174 std::cout << "Starting to write rate scalers" << std::endl;
175 auto* rateQueue = TRateScalerQueue::Get();
176 scalerTree = new TTree("RateScaler", "RateScaler");
177 scalerData = new TScalerData;
178 scalerTree->Branch("ScalerData", &scalerData);
179 while(rateQueue->Size() > 0) {
180 scalerData = rateQueue->PopScaler();
181 scalerTree->Fill();
182 }
183 scalerTree->Write();
184 std::cout << "Done writing scaler trees" << std::endl;
185 }
186
187 fOutputFile->Close();
188 fOutputFile->Delete();
189 gROOT->cd();
190 }
191}
192
193void TFragWriteLoop::WriteEvent(const std::shared_ptr<const TFragment>& event)
194{
195 if(fEventTree != nullptr) {
196 *fEventAddress = *event;
198 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
199 fEventTree->Fill();
200 // fEventAddress = nullptr;
201 } else {
202 std::cout << __PRETTY_FUNCTION__ << ": no fragment tree!" << std::endl; // NOLINT(cppcoreguidelines-pro-type-const-cast, cppcoreguidelines-pro-bounds-array-to-pointer-decay)
203 }
204}
205
206void TFragWriteLoop::WriteBadEvent(const std::shared_ptr<const TBadFragment>& event)
207{
208 if(fBadEventTree != nullptr) {
209 *fBadEventAddress = *static_cast<const TBadFragment*>(event.get());
210 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
211 fBadEventTree->Fill();
212 }
213}
214
215void TFragWriteLoop::WriteScaler(const std::shared_ptr<TEpicsFrag>& scaler)
216{
217 if(fScalerTree != nullptr) {
218 fScalerAddress = scaler.get();
219 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
220 fScalerTree->Fill();
221 fScalerAddress = nullptr;
222 }
223}
std::mutex ttree_fill_mutex
static int Size()
Definition GValue.h:63
static GValue * Get(const std::string &name="")
Definition GValue.h:39
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
std::string Name() const
void IncrementItemsPopped()
static int WriteToRoot(TFile *fileptr=nullptr)
static size_t GetNumberOfChannels()
Definition TChannel.h:68
static TDeadtimeScalerQueue * Get()
virtual void ClearTransients() const
void ClearQueue() override
std::string EndStatus() override
TTree * fBadEventTree
bool Iteration() override
TEpicsFrag * fScalerAddress
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TBadFragment > > > fBadInputQueue
void WriteBadEvent(const std::shared_ptr< const TBadFragment > &event)
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fInputQueue
void WriteScaler(const std::shared_ptr< TEpicsFrag > &scaler)
TBadFragment * fBadEventAddress
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TEpicsFrag > > > fScalerInputQueue
static TFragWriteLoop * Get(std::string name="", std::string fOutputFilename="")
TFragment * fEventAddress
void WriteEvent(const std::shared_ptr< const TFragment > &event)
TFragWriteLoop(const TFragWriteLoop &)=delete
bool WriteDiagnostics() const
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static bool WriteToFile(TFile *file=nullptr)
bool IgnoreScaler() const
Definition TPPG.h:132
Int_t Write(const char *name=nullptr, Int_t option=0, Int_t bufsize=0) override
Definition TPPG.h:149
static TRateScalerQueue * Get()
static bool WriteToRoot(TFile *fileptr=nullptr)
Definition TRunInfo.cxx:309
static TPPG * Get(bool verbose=false)
Definition TSingleton.h:33