GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TAnalysisWriteLoop.cxx
Go to the documentation of this file.
2
3#include <chrono>
4#include <thread>
5
6#include "TFile.h"
7#include "TThread.h"
8#include "TFileMerger.h"
9#include "TFileCacheWrite.h"
10#include "TROOT.h"
11
12#include "GValue.h"
13#include "TChannel.h"
14#include "TRunInfo.h"
15#include "TGRSIOptions.h"
16#include "TTreeFillMutex.h"
17#include "TSortingDiagnostics.h"
18#include "TParsingDiagnostics.h"
19
20TAnalysisWriteLoop* TAnalysisWriteLoop::Get(std::string name, std::string outputFilename)
21{
22 if(name.length() == 0) {
23 name = "write_loop";
24 }
25
26 auto* loop = static_cast<TAnalysisWriteLoop*>(StoppableThread::Get(name));
27 if(loop == nullptr) {
28 if(outputFilename.length() == 0) {
29 outputFilename = "temp.root";
30 }
31 loop = new TAnalysisWriteLoop(name, outputFilename);
32 }
33
34 return loop;
35}
36
37TAnalysisWriteLoop::TAnalysisWriteLoop(std::string name, const std::string& outputFilename)
38 : StoppableThread(std::move(name)),
39 fOutputFile(TFile::Open(outputFilename.c_str(), "recreate")),
40 fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<TUnpackedEvent>>>()),
41 fOutOfOrderQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>())
42{
43 if(fOutputFile == nullptr || !fOutputFile->IsOpen()) {
44 std::cerr << "Failed to open '" << outputFilename << "'" << std::endl;
45 throw;
46 }
47
48 fEventTree = new TTree("AnalysisTree", "AnalysisTree");
49 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
50 fOutOfOrderTree = new TTree("OutOfOrderTree", "OutOfOrderTree");
52 fOutOfOrderTree->Branch("Fragment", &fOutOfOrderFrag);
53 fOutOfOrder = true;
54 }
55}
56
58{
59 for(auto& elem : fDetMap) {
60 delete elem.second;
61 }
62}
63
65{
66 while(fInputQueue->Size() != 0u) {
67 std::shared_ptr<TUnpackedEvent> event;
68 fInputQueue->Pop(event);
69 }
70}
71
73{
74 std::ostringstream str;
75 str << Name() << ":\t" << std::setw(8) << ItemsPopped() << "/" << InputSize() + ItemsPopped() << ", "
76 << "??? good events" << std::endl;
77 return str.str();
78}
79
81{
82 Write();
83}
84
86{
87 std::shared_ptr<TUnpackedEvent> event;
88 InputSize(fInputQueue->Pop(event));
89 if(InputSize() < 0) {
90 InputSize(0);
91 std::this_thread::sleep_for(std::chrono::milliseconds(100));
92 } else {
94 }
95
96 if(fOutOfOrder) {
97 std::shared_ptr<const TFragment> frag;
98 fOutOfOrderQueue->Pop(frag, 0);
99 if(frag != nullptr) {
100 *fOutOfOrderFrag = *frag;
102 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
103 fOutOfOrderTree->Fill();
104 }
105 }
106
107 if(event != nullptr) {
108 WriteEvent(event);
109 return true;
110 }
111
112 return !(fInputQueue->IsFinished());
113}
114
116{
117 if(fOutputFile != nullptr) {
118 gROOT->cd();
119 auto* options = TGRSIOptions::Get();
120 auto* ppg = TPPG::Get();
121 auto* diag = TSortingDiagnostics::Get();
122
123 fOutputFile->cd();
124 if(GValue::Size() != 0) {
125 GValue::Get()->Write("Values", TObject::kOverwrite);
126 }
129 }
132 ppg->Write("PPG");
133
134 if(options->WriteDiagnostics()) {
135 diag->Write("SortingDiagnostics", TObject::kOverwrite);
136 if(!options->WriteFragmentTree()) {
137 auto* parsingDiagnostics = TParsingDiagnostics::Get();
138 parsingDiagnostics->Write("ParsingDiagnostics", TObject::kOverwrite);
139 }
140 }
141
142 fOutputFile->Write();
143 delete fOutputFile;
144 fOutputFile = nullptr;
145 }
146}
147
149{
150 if(fDetMap.count(cls) == 0u) {
151 // This uses the ROOT dictionaries, so we need to lock the threads.
152 TThread::Lock();
153
154 // Make a default detector of that type.
155 auto* det_p = reinterpret_cast<TDetector*>(cls->New());
156 fDefaultDets[cls] = det_p;
157
158 // Add to our local map
159 auto* det_pp = new TDetector*;
160 *det_pp = det_p;
161 fDetMap[cls] = det_pp;
162
163 // Make a new branch.
164 TBranch* newBranch = fEventTree->Branch(cls->GetName(), cls->GetName(), det_pp);
165
166 // Fill the new branch up to the point where the tree is filled.
167 // Explanation:
168 // When TTree::Fill is called, it calls TBranch::Fill for each
169 // branch, then increments the number of entries. We may be
170 // adding branches after other branches have already been filled.
171 // If the branch 'x' has been filled 100 times before the branch
172 // 'y' is created, then the next call to TTree::Fill will fill
173 // entry 101 of 'x', but entry 1 of 'y', rather than entry
174 // 101 of both.
175 // Therefore, we need to fill the new branch as many times as
176 // TTree::Fill has been called before.
177 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
178 for(int i = 0; i < fEventTree->GetEntries(); i++) {
179 newBranch->Fill();
180 }
181
182 std::cout << "\r" << std::string(30, ' ') << "\r" << Name() << ": added \"" << cls->GetName() << R"(" branch, )" << det_pp << ", " << det_p << std::string(30, ' ') << std::endl;
183
184 // Unlock after we are done.
185 TThread::UnLock();
186 }
187}
188
189void TAnalysisWriteLoop::WriteEvent(std::shared_ptr<TUnpackedEvent>& event)
190{
191 if(fEventTree != nullptr) {
192 // Clear pointers from previous writes.
193 // Note that we cannot just set this equal to nullptr,
194 // because ROOT would then construct a new object.
195 // This contradicts the ROOT documentation for TBranchElement::SetAddress,
196 // which suggests that a new object would be constructed only when setting the address,
197 // not when filling the TTree.
198 for(auto& elem : fDetMap) {
199 (*elem.second)->Clear();
200 }
201
202 // Load current events
203 for(const auto& det : event->GetDetectors()) {
204 TClass* cls = det->IsA();
205 // attempt to copy this detector into the detector map
206 // if that fails (because the detector isn't in the map yet), create the branch and then copy this detector
207 try {
208 **fDetMap.at(cls) = *det;
209 } catch(std::out_of_range& e) {
210 AddBranch(cls);
211 **fDetMap.at(cls) = *det;
212 }
213 (*fDetMap.at(cls))->ClearTransients();
214 }
215
216 // Fill
217 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
218 fEventTree->Fill();
219 }
220}
std::mutex ttree_fill_mutex
static int Size()
Definition GValue.h:63
static GValue * Get(const std::string &name="")
Definition GValue.h:39
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
std::string Name() const
void IncrementItemsPopped()
bool WriteToFile(const std::string &file)
static TAnalysisWriteLoop * Get(std::string name="", std::string outputFilename="")
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fOutOfOrderQueue
std::string EndStatus() override
void WriteEvent(std::shared_ptr< TUnpackedEvent > &event)
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TUnpackedEvent > > > fInputQueue
std::map< TClass *, TDetector ** > fDetMap
bool Iteration() override
void ClearQueue() override
void AddBranch(TClass *cls)
std::map< TClass *, TDetector * > fDefaultDets
TAnalysisWriteLoop(const TAnalysisWriteLoop &)=delete
static int WriteToRoot(TFile *fileptr=nullptr)
static size_t GetNumberOfChannels()
Definition TChannel.h:68
virtual void ClearTransients() const
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static TAnalysisOptions * AnalysisOptions()
static bool WriteToRoot(TFile *fileptr=nullptr)
Definition TRunInfo.cxx:309
static TPPG * Get(bool verbose=false)
Definition TSingleton.h:33