GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TAnalysisWriteLoop.cxx
Go to the documentation of this file.
2
3#include <chrono>
4#include <thread>
5
6#include "TFile.h"
7#include "TThread.h"
8#include "TMessage.h"
9#include "TSocket.h"
10#include "TMemFile.h"
11#include "TFileMerger.h"
12#include "TServerSocket.h"
13#include "TMonitor.h"
14#include "TFileCacheWrite.h"
15#include "TROOT.h"
16#include "THashTable.h"
17
18#include "GValue.h"
19#include "TChannel.h"
20#include "TRunInfo.h"
21#include "TGRSIOptions.h"
22#include "TTreeFillMutex.h"
23#include "TSortingDiagnostics.h"
24
25TAnalysisWriteLoop* TAnalysisWriteLoop::Get(std::string name, std::string outputFilename)
26{
27 if(name.length() == 0) {
28 name = "write_loop";
29 }
30
31 auto* loop = static_cast<TAnalysisWriteLoop*>(StoppableThread::Get(name));
32 if(loop == nullptr) {
33 if(outputFilename.length() == 0) {
34 outputFilename = "temp.root";
35 }
36 loop = new TAnalysisWriteLoop(name, outputFilename);
37 }
38
39 return loop;
40}
41
42TAnalysisWriteLoop::TAnalysisWriteLoop(std::string name, const std::string& outputFilename)
43 : StoppableThread(std::move(name)),
44 fOutputFile(TFile::Open(outputFilename.c_str(), "recreate")),
45 fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<TUnpackedEvent>>>()),
46 fOutOfOrderQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>())
47{
48 if(fOutputFile == nullptr || !fOutputFile->IsOpen()) {
49 std::cerr << "Failed to open '" << outputFilename << "'" << std::endl;
50 throw;
51 }
52
53 fEventTree = new TTree("AnalysisTree", "AnalysisTree");
54 if(TGRSIOptions::Get()->SeparateOutOfOrder()) {
55 fOutOfOrderTree = new TTree("OutOfOrderTree", "OutOfOrderTree");
57 fOutOfOrderTree->Branch("Fragment", &fOutOfOrderFrag);
58 fOutOfOrder = true;
59 }
60}
61
63{
64 for(auto& elem : fDetMap) {
65 delete elem.second;
66 }
67}
68
70{
71 while(fInputQueue->Size() != 0u) {
72 std::shared_ptr<TUnpackedEvent> event;
73 fInputQueue->Pop(event);
74 }
75}
76
78{
79 std::ostringstream str;
80 str << Name() << ":\t" << std::setw(8) << ItemsPopped() << "/" << InputSize() + ItemsPopped() << ", "
81 << "??? good events" << std::endl;
82 return str.str();
83}
84
86{
87 Write();
88}
89
91{
92 std::shared_ptr<TUnpackedEvent> event;
93 InputSize(fInputQueue->Pop(event));
94 if(InputSize() < 0) {
95 InputSize(0);
96 std::this_thread::sleep_for(std::chrono::milliseconds(100));
97 } else {
99 }
100
101 if(fOutOfOrder) {
102 std::shared_ptr<const TFragment> frag;
103 fOutOfOrderQueue->Pop(frag, 0);
104 if(frag != nullptr) {
105 *fOutOfOrderFrag = *frag;
107 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
108 fOutOfOrderTree->Fill();
109 }
110 }
111
112 if(event != nullptr) {
113 WriteEvent(event);
114 return true;
115 }
116
117 return !(fInputQueue->IsFinished());
118}
119
121{
122 if(fOutputFile != nullptr) {
123 gROOT->cd();
124 auto* options = TGRSIOptions::Get();
125 auto* ppg = TPPG::Get();
126 auto* diag = TSortingDiagnostics::Get();
127
128 fOutputFile->cd();
129 if(GValue::Size() != 0) {
130 GValue::Get()->Write("Values", TObject::kOverwrite);
131 }
134 }
137 ppg->Write("PPG");
138
139 if(options->WriteDiagnostics()) {
140 diag->Write("SortingDiagnostics", TObject::kOverwrite);
141 }
142
143 fOutputFile->Write();
144 delete fOutputFile;
145 fOutputFile = nullptr;
146 }
147}
148
150{
151 if(fDetMap.count(cls) == 0u) {
152 // This uses the ROOT dictionaries, so we need to lock the threads.
153 TThread::Lock();
154
155 // Make a default detector of that type.
156 auto* det_p = reinterpret_cast<TDetector*>(cls->New());
157 fDefaultDets[cls] = det_p;
158
159 // Add to our local map
160 auto* det_pp = new TDetector*;
161 *det_pp = det_p;
162 fDetMap[cls] = det_pp;
163
164 // Make a new branch.
165 TBranch* newBranch = fEventTree->Branch(cls->GetName(), cls->GetName(), det_pp);
166
167 // Fill the new branch up to the point where the tree is filled.
168 // Explanation:
169 // When TTree::Fill is called, it calls TBranch::Fill for each
170 // branch, then increments the number of entries. We may be
171 // adding branches after other branches have already been filled.
172 // If the branch 'x' has been filled 100 times before the branch
173 // 'y' is created, then the next call to TTree::Fill will fill
174 // entry 101 of 'x', but entry 1 of 'y', rather than entry
175 // 101 of both.
176 // Therefore, we need to fill the new branch as many times as
177 // TTree::Fill has been called before.
178 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
179 for(int i = 0; i < fEventTree->GetEntries(); i++) {
180 newBranch->Fill();
181 }
182
183 std::cout << "\r" << std::string(30, ' ') << "\r" << Name() << ": added \"" << cls->GetName() << R"(" branch, )" << det_pp << ", " << det_p << std::string(30, ' ') << std::endl;
184
185 // Unlock after we are done.
186 TThread::UnLock();
187 }
188}
189
190void TAnalysisWriteLoop::WriteEvent(std::shared_ptr<TUnpackedEvent>& event)
191{
192 if(fEventTree != nullptr) {
193 // Clear pointers from previous writes.
194 // Note that we cannot just set this equal to nullptr,
195 // because ROOT would then construct a new object.
196 // This contradicts the ROOT documentation for TBranchElement::SetAddress,
197 // which suggests that a new object would be constructed only when setting the address,
198 // not when filling the TTree.
199 for(auto& elem : fDetMap) {
200 (*elem.second)->Clear();
201 }
202
203 // Load current events
204 for(const auto& det : event->GetDetectors()) {
205 TClass* cls = det->IsA();
206 // attempt to copy this detector into the detector map
207 // if that fails (because the detector isn't in the map yet), create the branch and then copy this detector
208 try {
209 **fDetMap.at(cls) = *det;
210 } catch(std::out_of_range& e) {
211 AddBranch(cls);
212 **fDetMap.at(cls) = *det;
213 }
214 (*fDetMap.at(cls))->ClearTransients();
215 }
216
217 // Fill
218 std::lock_guard<std::mutex> lock(ttree_fill_mutex);
219 fEventTree->Fill();
220 }
221}
std::mutex ttree_fill_mutex
static int Size()
Definition GValue.h:63
static GValue * Get(const std::string &name="")
Definition GValue.h:39
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
std::string Name() const
void IncrementItemsPopped()
bool WriteToFile(const std::string &file)
static TAnalysisWriteLoop * Get(std::string name="", std::string outputFilename="")
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fOutOfOrderQueue
std::string EndStatus() override
void WriteEvent(std::shared_ptr< TUnpackedEvent > &event)
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< TUnpackedEvent > > > fInputQueue
std::map< TClass *, TDetector ** > fDetMap
bool Iteration() override
void ClearQueue() override
void AddBranch(TClass *cls)
std::map< TClass *, TDetector * > fDefaultDets
TAnalysisWriteLoop(const TAnalysisWriteLoop &)=delete
static int WriteToRoot(TFile *fileptr=nullptr)
static size_t GetNumberOfChannels()
Definition TChannel.h:68
virtual void ClearTransients() const
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static TAnalysisOptions * AnalysisOptions()
static bool WriteToRoot(TFile *fileptr=nullptr)
Definition TRunInfo.cxx:309
static TPPG * Get(bool verbose=false)
Definition TSingleton.h:33