GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TAnalysisWriteLoop.cxx
Go to the documentation of this file.
2
3#include <chrono>
4#include <thread>
5
6#include "TFile.h"
7#include "TThread.h"
8#include "TFileMerger.h"
9#include "TFileCacheWrite.h"
10#include "TROOT.h"
11
12#include "GValue.h"
13#include "TChannel.h"
14#include "TRunInfo.h"
15#include "TGRSIOptions.h"
16#include "TTreeFillMutex.h"
17#include "TSortingDiagnostics.h"
18#include "TParsingDiagnostics.h"
19#include "TScalerQueue.h"
20
21TAnalysisWriteLoop* TAnalysisWriteLoop::Get(std::string name, std::string outputFilename)
22{
23 if(name.empty()) {
24 name = "write_loop";
25 }
26
27 auto* loop = static_cast<TAnalysisWriteLoop*>(StoppableThread::Get(name));
28 if(loop == nullptr) {
29 if(outputFilename.empty()) {
30 outputFilename = "temp.root";
31 }
32 loop = new TAnalysisWriteLoop(name, outputFilename);
33 }
34
35 return loop;
36}
37
38TAnalysisWriteLoop::TAnalysisWriteLoop(std::string name, const std::string& outputFilename)
39 : StoppableThread(std::move(name)),
40 fOutputFile(TFile::Open(outputFilename.c_str(), "recreate")),
41 fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<TUnpackedEvent>>>()),
42 fOutOfOrderQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>())
43{
44 if(fOutputFile == nullptr || !fOutputFile->IsOpen()) {
45 std::cerr << "Failed to open '" << outputFilename << "'" << std::endl;
46 throw;
47 }
48
49 fEventTree = new TTree("AnalysisTree", "AnalysisTree");
50 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
51 fOutOfOrderTree = new TTree("OutOfOrderTree", "OutOfOrderTree");
53 fOutOfOrderTree->Branch("Fragment", &fOutOfOrderFrag);
54 fOutOfOrder = true;
55 }
56}
57
59{
60 for(auto& elem : fDetMap) {
61 delete elem.second;
62 }
63 for(auto& elem : fDefaultDets) {
64 delete elem.second;
65 }
66 delete fOutOfOrderFrag;
67 // for some reason deleting these creates a seg-fault at the end of sorting
68 // leaving them in for now as a reminder to check why
69 //delete fOutOfOrderTree;
70 //delete fEventTree;
71}
72
74{
75 while(fInputQueue->Size() != 0u) {
76 std::shared_ptr<TUnpackedEvent> event;
77 fInputQueue->Pop(event);
78 }
79}
80
82{
83 std::ostringstream str;
84 str << Name() << ":\t" << std::setw(8) << ItemsPopped() << "/" << InputSize() + ItemsPopped() << ", "
85 << "??? good events" << std::endl;
86 return str.str();
87}
88
90{
91 Write();
92}
93
95{
96 std::shared_ptr<TUnpackedEvent> event;
97 InputSize(fInputQueue->Pop(event));
98 if(InputSize() < 0) {
99 InputSize(0);
100 std::this_thread::sleep_for(std::chrono::milliseconds(100));
101 } else {
103 }
104
105 if(fOutOfOrder) {
106 std::shared_ptr<const TFragment> frag;
107 fOutOfOrderQueue->Pop(frag, 0);
108 if(frag != nullptr) {
109 *fOutOfOrderFrag = *frag;
111 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
112 fOutOfOrderTree->Fill();
113 }
114 }
115
116 if(event != nullptr) {
117 WriteEvent(event);
118 return true;
119 }
120
121 return !(fInputQueue->IsFinished());
122}
123
125{
126 if(fOutputFile != nullptr) {
127 gROOT->cd();
128 auto* options = TGRSIOptions::Get();
129 auto* ppg = TPPG::Get();
130
131 fOutputFile->cd();
132 if(GValue::Size() != 0) {
133 GValue::Get()->Write("Values", TObject::kOverwrite);
134 }
137 }
140 ppg->Write("PPG");
141
142 if(options->WriteDiagnostics()) {
143 auto* diag = TSortingDiagnostics::Get();
144 diag->Write("SortingDiagnostics", TObject::kOverwrite);
145 }
146 // if we do not write a fragment tree we need to write some diagnostics and scalers to the analysis file
147 // that would normally be in the fragment file
148 if(!options->WriteFragmentTree()) {
149 // write the parsing diagnostices
150 auto* parsingDiagnostics = TParsingDiagnostics::Get();
151 parsingDiagnostics->Write("ParsingDiagnostics", TObject::kOverwrite);
152
153 // always get the scaler queues so that we can delete them
154 // but only write them if --ignore-scalers isn't used
155 auto* deadtimeQueue = TDeadtimeScalerQueue::Get();
156 auto* rateQueue = TRateScalerQueue::Get();
157 if(!options->IgnoreScaler()) {
158 std::cout << "Starting to write dead time scalers" << std::endl;
159 auto* scalerTree = new TTree("DeadtimeScaler", "DeadtimeScaler");
160 auto* scalerData = new TScalerData;
161 scalerTree->Branch("ScalerData", &scalerData);
162 delete scalerData;
163 while(deadtimeQueue->Size() > 0) {
164 scalerData = deadtimeQueue->PopScaler();
165 scalerTree->Fill();
166 delete scalerData;
167 }
168 scalerTree->Write();
169 delete scalerTree;
170
171 std::cout << "Starting to write rate scalers" << std::endl;
172 scalerTree = new TTree("RateScaler", "RateScaler");
173 scalerData = new TScalerData;
174 scalerTree->Branch("ScalerData", &scalerData);
175 delete scalerData;
176 while(rateQueue->Size() > 0) {
177 scalerData = rateQueue->PopScaler();
178 scalerTree->Fill();
179 delete scalerData;
180 }
181 scalerTree->Write();
182 delete scalerTree;
183 std::cout << "Done writing scaler trees" << std::endl;
184 }
185 delete deadtimeQueue;
186 delete rateQueue;
187 }
188
189 fOutputFile->Write();
190 delete fOutputFile;
191 fOutputFile = nullptr;
192 }
193}
194
196{
197 if(fDetMap.count(cls) == 0u) {
198 // This uses the ROOT dictionaries, so we need to lock the threads.
199 TThread::Lock();
200
201 // Make a default detector of that type.
202 auto* det_p = reinterpret_cast<TDetector*>(cls->New());
203 fDefaultDets[cls] = det_p;
204
205 // Add to our local map
206 auto* det_pp = new TDetector*;
207 *det_pp = det_p;
208 fDetMap[cls] = det_pp;
209
210 // Make a new branch.
211 TBranch* newBranch = fEventTree->Branch(cls->GetName(), cls->GetName(), det_pp);
212
213 // Fill the new branch up to the point where the tree is filled.
214 // Explanation:
215 // When TTree::Fill is called, it calls TBranch::Fill for each
216 // branch, then increments the number of entries. We may be
217 // adding branches after other branches have already been filled.
218 // If the branch 'x' has been filled 100 times before the branch
219 // 'y' is created, then the next call to TTree::Fill will fill
220 // entry 101 of 'x', but entry 1 of 'y', rather than entry
221 // 101 of both.
222 // Therefore, we need to fill the new branch as many times as
223 // TTree::Fill has been called before.
224 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
225 for(int i = 0; i < fEventTree->GetEntries(); i++) {
226 newBranch->Fill();
227 }
228
229 std::cout << "\r" << std::string(30, ' ') << "\r" << Name() << ": added \"" << cls->GetName() << R"(" branch, )" << det_pp << ", " << det_p << std::string(30, ' ') << std::endl;
230
231 // Unlock after we are done.
232 TThread::UnLock();
233 }
234}
235
236void TAnalysisWriteLoop::WriteEvent(std::shared_ptr<TUnpackedEvent>& event)
237{
238 if(fEventTree != nullptr) {
239 // Clear pointers from previous writes.
240 // Note that we cannot just set this equal to nullptr,
241 // because ROOT would then construct a new object.
242 // This contradicts the ROOT documentation for TBranchElement::SetAddress,
243 // which suggests that a new object would be constructed only when setting the address,
244 // not when filling the TTree.
245 for(auto& elem : fDetMap) {
246 (*elem.second)->Clear("a");
247 }
248
249 // Load current events
250 for(const auto& det : event->GetDetectors()) {
251 TClass* cls = det->IsA();
252 // check if this detector is in the detector map, if not, add it
253 if(fDetMap.find(cls) == fDetMap.end()) {
254 AddBranch(cls);
255 }
256 **fDetMap.at(cls) = *det; // this should call Copy internally
257 (*fDetMap.at(cls))->ClearTransients();
258 }
259
260 // Fill
261 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
262 fEventTree->Fill();
263 }
264}
std::mutex ttree_fill_mutex
static int Size()
Definition GValue.h:63
static GValue * Get(const std::string &name="")
Definition GValue.h:39
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
std::string Name() const
void IncrementItemsPopped()
bool WriteToFile(const std::string &file)
static TAnalysisWriteLoop * Get(std::string name="", std::string outputFilename="")
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fOutOfOrderQueue
std::string EndStatus() override
void WriteEvent(std::shared_ptr< TUnpackedEvent > &event)
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TUnpackedEvent > > > fInputQueue
std::map< TClass *, TDetector ** > fDetMap
bool Iteration() override
void ClearQueue() override
void AddBranch(TClass *cls)
std::map< TClass *, TDetector * > fDefaultDets
TAnalysisWriteLoop(const TAnalysisWriteLoop &)=delete
static int WriteToRoot(TFile *fileptr=nullptr)
static size_t GetNumberOfChannels()
Definition TChannel.h:63
static TDeadtimeScalerQueue * Get()
virtual void ClearTransients() const
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static TAnalysisOptions * AnalysisOptions()
static TRateScalerQueue * Get()
static bool WriteToRoot(TFile *fileptr=nullptr)
Definition TRunInfo.cxx:309
static TPPG * Get(bool verbose=false)
Definition TSingleton.h:33