GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TFragDiagnosticsLoop.cxx
Go to the documentation of this file.
2
3#include <sstream>
4#include <iomanip>
5#include <chrono>
6#include <thread>
7
8#include "TFile.h"
9#include "TThread.h"
10#include "TROOT.h"
11#include "TH2.h"
12
13#include "GValue.h"
14#include "TChannel.h"
15#include "TRunInfo.h"
16#include "TGRSIOptions.h"
17#include "TTreeFillMutex.h"
18#include "TParsingDiagnostics.h"
19
20#include "TBadFragment.h"
21#include "TScalerQueue.h"
22
23TFragDiagnosticsLoop* TFragDiagnosticsLoop::Get(std::string name, std::string fOutputFilename)
24{
25 if(name.empty()) {
26 name = "diag_loop";
27 }
28
29 auto* loop = static_cast<TFragDiagnosticsLoop*>(StoppableThread::Get(name));
30 if(loop == nullptr) {
31 if(fOutputFilename.empty()) {
32 fOutputFilename = "temp.root";
33 }
34 loop = new TFragDiagnosticsLoop(name, fOutputFilename);
35 }
36 return loop;
37}
38
39TFragDiagnosticsLoop::TFragDiagnosticsLoop(std::string name, const std::string& fOutputFilename)
40 : StoppableThread(std::move(name)),
41 fInputQueue(std::make_shared<ThreadsafeQueue<std::shared_ptr<const TFragment>>>())
42{
43 if(fOutputFilename != "/dev/null") {
44 TThread::Lock();
45
46 fOutputFile = new TFile(fOutputFilename.c_str(), "RECREATE");
47 if(fOutputFile == nullptr || !fOutputFile->IsOpen()) {
48 throw std::runtime_error(Form("Failed to open \"%s\"\n", fOutputFilename.c_str()));
49 }
50
51 // we delay the creating of histograms until we need them so that all the information needed is present
52 TThread::UnLock();
53 }
54}
55
60
62{
63 while(fInputQueue->Size() != 0u) {
64 std::shared_ptr<const TFragment> event;
65 fInputQueue->Pop(event);
66 }
67}
68
70{
71 std::ostringstream str;
72 str << std::endl
73 << Name() << ": " << std::setw(8) << ItemsPopped() << "/" << ItemsPopped() + InputSize() << std::endl;
74 return str.str();
75}
76
78{
79 std::shared_ptr<const TFragment> event;
80 InputSize(fInputQueue->Pop(event, 0));
81 if(InputSize() < 0) {
82 InputSize(0);
83 }
84
85 if(event != nullptr) {
86 Process(event);
88 return true;
89 }
90
91 if(fInputQueue->IsFinished()) {
92 return false;
93 }
94 std::this_thread::sleep_for(std::chrono::milliseconds(100));
95 return true;
96}
97
99{
100 if(fOutputFile != nullptr) {
101 // get all singletons before switching to the output file
102 gROOT->cd();
103 TGRSIOptions* options = TGRSIOptions::Get();
104 TPPG* ppg = TPPG::Get();
105 TParsingDiagnostics* parsingDiagnostics = TParsingDiagnostics::Get();
106 GValue* gValues = GValue::Get();
107
108 // check our final run length and compare to the binning we chose
110 std::cerr << Name() << ": got a final run length of " << TRunInfo::RunLength() << " s, instead of the assumed run length of " << fRunLength << " s, some diagnostics information will be missing!" << std::endl;
111 } else if(TRunInfo::RunLength() < fRunLength / 2) {
112 // TODO: if we have less than half the range, resize the histograms (maybe via a general function?)
113 }
114 // switch to the output file
115 fOutputFile->cd();
116
117 fAccepted->Write();
118 fLostNetworkPackets->Write();
119 fLostChannelIds->Write();
120 fLostAcceptedIds->Write();
121 fLostChannelIdsTime->Write();
122 fLostAcceptedIdsTime->Write();
123
124 if(GValue::Size() != 0) {
125 gValues->Write("Values", TObject::kOverwrite);
126 }
127
130 }
131
134 ppg->Write("PPG");
135
136 if(options->WriteDiagnostics()) {
137 parsingDiagnostics->ReadPPG(ppg); // this set's the cycle length from the PPG information
138 parsingDiagnostics->Write("ParsingDiagnostics", TObject::kOverwrite);
139 }
140
141 fOutputFile->Close();
142 fOutputFile->Delete();
143 gROOT->cd();
144 }
145}
146
148{
149 std::cout << Name() << ": creating histograms using output file " << fOutputFile << std::endl;
150
151 if(fOutputFile == nullptr || !fOutputFile->IsOpen()) {
152 return false;
153 }
154 fOutputFile->cd();
155
156 // at this point we have read the first midas event and have a run start, but not a run stop yet (we will only find that one out at the end of the run)
157 // so for now we assume a run length of 3600 seconds
158 fRunLength = (TRunInfo::RunLength() > 0. ? static_cast<int>(TRunInfo::RunLength()) : 3600);
159 // start with 1 s binning
160 int nofTimeBins = fRunLength;
161 if(nofTimeBins < 1000) {
162 // if we can, increase the binning to 100 ms binning
163 nofTimeBins = fRunLength * 10;
164 } else if(nofTimeBins > 10000) {
165 // if we have to, decrease the binning to 10 s binning
166 nofTimeBins = fRunLength / 10;
167 std::cout << "Warning, run is more than 10000 s long (run length " << TRunInfo::RunLength() << " s), reducing binning to 10 s (number of bins " << nofTimeBins << ")!" << std::endl;
168 }
169
170 std::cout << Name() << ": creating histograms with " << nofTimeBins << " time bins from 0 to " << fRunLength << std::endl;
171
172 fAccepted = new TH2D("accepted", "Accepted Channel ID vs. Channel Address;Channel Address;Accepted Channel ID",
173 TChannel::GetNumberOfChannels(), 0, static_cast<Double_t>(TChannel::GetNumberOfChannels()), 10000, 0, 1e6);
174 fLostNetworkPackets = new TH1D("lostNetworkPackets", "lost network packets;time [s];lost network packets",
175 nofTimeBins, 0., fRunLength);
176 fLostChannelIds = new TH2D("lostChannelIds", "Lost Channel Id vs. Channel Number;Channel Number;Lost Channel Id",
177 TChannel::GetNumberOfChannels(), 0, static_cast<Double_t>(TChannel::GetNumberOfChannels()), 10000, 0, 1e6);
178 fLostAcceptedIds = new TH2D("lostAcceptedIds", "Lost Accepted Channel Id vs. Channel Number;Channel Number;Lost Accepted Channel Id",
179 TChannel::GetNumberOfChannels(), 0, static_cast<Double_t>(TChannel::GetNumberOfChannels()), 10000, 0, 1e6);
180 fLostChannelIdsTime = new TH2D("lostChannelIdsTime", "Lost Channel Id time vs. Channel Number;Channel Number;time [s]",
181 TChannel::GetNumberOfChannels(), 0, static_cast<Double_t>(TChannel::GetNumberOfChannels()), nofTimeBins, 0., fRunLength);
182 fLostAcceptedIdsTime = new TH2D("lostAcceptedIdsTime", "Lost Accepted Channel Id time vs. Channel Number;Channel Number;time [s]",
183 TChannel::GetNumberOfChannels(), 0, static_cast<Double_t>(TChannel::GetNumberOfChannels()), nofTimeBins, 0., fRunLength);
184
185 int bin = 1;
186 for(const auto& channel : *TChannel::GetChannelMap()) {
187 fAccepted->GetXaxis()->SetBinLabel(bin, Form("0x%04x", channel.first));
188 fLostChannelIds->GetXaxis()->SetBinLabel(bin, Form("0x%04x", channel.first));
189 fLostAcceptedIds->GetXaxis()->SetBinLabel(bin, Form("0x%04x", channel.first));
190 fLostChannelIdsTime->GetXaxis()->SetBinLabel(bin, Form("0x%04x", channel.first));
191 fLostAcceptedIdsTime->GetXaxis()->SetBinLabel(bin, Form("0x%04x", channel.first));
192 ++bin;
193 }
194
195 return true;
196}
197
198void TFragDiagnosticsLoop::Process(const std::shared_ptr<const TFragment>& event)
199{
200 if(fAccepted == nullptr || fLostNetworkPackets == nullptr || fLostChannelIds == nullptr ||
201 fLostAcceptedIds == nullptr || fLostChannelIdsTime == nullptr || fLostAcceptedIdsTime == nullptr) {
202 if(!CreateHistograms()) {
203 throw std::runtime_error(Name() + ": trying to create histograms failed, can't produce diagnostics!");
204 }
205 }
206 auto timeStamp = event->GetTimeStampNs();
207 auto address = event->GetAddress();
208 auto acceptedId = event->GetAcceptedChannelId();
209 auto channelId = event->GetChannelId();
210 auto networkPacket = event->GetNetworkPacketNumber();
211
212 //---------------- this section deals with the rolling over of the AcceptedChannelId. -------------------//
213 // if we do not have an entry in the fRolling map, it will be created as false
214 if(!fRolling[address] && acceptedId > (fAcceptedMax - fRollingThreshold)) {
215 ++fRollnum[address];
216 }
217 if(fRolling[address] && acceptedId > fRollingThreshold && acceptedId < (fAcceptedMax / 2)) {
218 ++fRollnum[address];
219 }
220 if(!fRolling[address] && fRollnum[address] > fRollnumThreshold) {
221 fRolling[address] = true;
222 fRollnum[address] = 0;
223 }
224 if(fRolling[address] && fRollnum[address] > fRollnumThreshold) {
225 fRolling[address] = false;
226 fRollnum[address] = 0;
227 ++fNofRollovers[address];
228 }
229
230 if(fRolling[address] && acceptedId < fRollingThreshold * 2) {
231 acceptedId += fAcceptedMax;
232 }
233 acceptedId += fNofRollovers[address] * fAcceptedMax;
234
235 // check whether we had this channel before and if not, initialize all arrays
236 if(fChannelIds.find(address) == fChannelIds.end()) {
237 fChannelIds[address] = {0, 0};
238 fAcceptedChannelIds[address] = {0, 0};
239 fTimeStamps[address] = {0, 0};
240 }
241
242 // if we have a new network packet number
243 if(networkPacket != 0 && fNetworkPacketNumber[0] != 0) {
244 // we get here once we get the 3rd network packet number, so now
245 // [1] is the last, [0] the second to last
246
247 // we don't need to check that [0] is smaller than [1]
248 if(fNetworkPacketNumber[1] < networkPacket) {
249 // we get a new highest network packet number, so count all numbers between the second to last and the last one
250 // before we update things
251 for(int packet = fNetworkPacketNumber[0] + 1; packet < fNetworkPacketNumber[1]; ++packet) {
252 fLostNetworkPackets->Fill(static_cast<double>(fNetworkPacketTimeStamp[1]) / 1e9);
253 }
254 // things look fine, so prepare for next time
256 fNetworkPacketNumber[1] = networkPacket;
258 fNetworkPacketTimeStamp[1] = timeStamp;
259 // [1] is now the current packet number, [0] is the last packet number
260 } else if(networkPacket < fNetworkPacketNumber[1]) {
261 //std::cout << "found wrong network packet number " << hex(networkPacket, 8)
262 // << " (between " << hex(fNetworkPacketNumber[0], 8) << " and " << hex(fNetworkPacketNumber[1]) << ", 8)" << std::endl;
263 // we update [1] to the current packet number here, not sure if that is the best way to deal with this situation?
264 fNetworkPacketNumber[1] = networkPacket;
265 fNetworkPacketTimeStamp[1] = timeStamp;
266 }
267 // It seems that we can get the same packet number multiple times? But if that happens there's nothing to do so no need to check.
268 } else if(networkPacket != 0) {
269 // if [0] is 0, we haven't got 2 network packets numbers yet, so we just copy them through
270 // that means after this [1] is the current, and [0] the last
272 fNetworkPacketNumber[1] = networkPacket;
274 fNetworkPacketTimeStamp[1] = timeStamp;
275 }
276
277 // check if the "middle" channel ID is reasonable and fill all IDs we've missed between the first and middle ID
278 if(fChannelIds[address][0] != 0) {
279 if(fChannelIds[address][0] < fChannelIds[address][1] && fChannelIds[address][1] < channelId) {
280 for(int id = fChannelIds[address][0] + 1; id < fChannelIds[address][1]; ++id) {
281 fLostChannelIds->Fill(Form("0x%04x", address), id, 1.);
282 fLostChannelIdsTime->Fill(Form("0x%04x", address), static_cast<double>(fTimeStamps[address][1]) / 1e9, 1.);
283 }
284 fTimeStamps[address][0] = fTimeStamps[address][1];
285 fChannelIds[address][0] = fChannelIds[address][1];
286 }
287 // we either got a good new timestamp and channel ID and have copied the information from [1] to [0]
288 // or we had a bad timestamp that we overwrite now
289 fTimeStamps[address][1] = timeStamp;
290 fChannelIds[address][1] = channelId;
291 } else {
292 fTimeStamps[address][0] = fTimeStamps[address][1];
293 fChannelIds[address][0] = fChannelIds[address][1];
294 fTimeStamps[address][1] = timeStamp;
295 fChannelIds[address][1] = channelId;
296 }
297
298 fAccepted->Fill(Form("0x%04x", address), static_cast<double>(acceptedId), 1.);
299
300 // check if the "middle" accepted channel ID is reasonable and fill all IDs we've missed between the first and
301 // middle ID
302 if(fTimeStamps[address][0] != 0) {
303 if(fAcceptedChannelIds[address][0] < fAcceptedChannelIds[address][1] && fAcceptedChannelIds[address][1] < acceptedId) {
304 for(int id = fAcceptedChannelIds[address][0] + 1; id < fAcceptedChannelIds[address][1]; ++id) {
305 fLostAcceptedIds->Fill(Form("0x%04x", address), id, 1.);
306 fLostAcceptedIdsTime->Fill(Form("0x%04x", address), static_cast<double>(fTimeStamps[address][1]) / 1e9, 1.);
307 }
308 fAcceptedChannelIds[address][0] = fAcceptedChannelIds[address][1];
309 }
310 fAcceptedChannelIds[address][1] = acceptedId;
311 }
312}
static int Size()
Definition GValue.h:63
static GValue * Get(const std::string &name="")
Definition GValue.h:39
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
std::string Name() const
void IncrementItemsPopped()
static int WriteToRoot(TFile *fileptr=nullptr)
static std::unordered_map< unsigned int, TChannel * > * GetChannelMap()
Definition TChannel.h:67
static size_t GetNumberOfChannels()
Definition TChannel.h:63
void Process(const std::shared_ptr< const TFragment > &event)
TFragDiagnosticsLoop(const TFragDiagnosticsLoop &)=delete
std::map< unsigned int, std::array< int64_t, 2 > > fTimeStamps
std::map< unsigned int, std::array< int64_t, 2 > > fChannelIds
std::array< int64_t, 2 > fNetworkPacketTimeStamp
static TFragDiagnosticsLoop * Get(std::string name="", std::string fOutputFilename="")
std::map< unsigned int, std::array< int64_t, 2 > > fAcceptedChannelIds
std::map< unsigned int, int > fNofRollovers
std::map< unsigned int, bool > fRolling
std::array< int, 2 > fNetworkPacketNumber
std::string EndStatus() override
std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > fInputQueue
std::map< unsigned int, int > fRollnum
bool WriteDiagnostics() const
static TGRSIOptions * Get(int argc=0, char **argv=nullptr)
Do not use!
static bool WriteToFile(TFile *file=nullptr)
Definition TPPG.h:130
Int_t Write(const char *name=nullptr, Int_t option=0, Int_t bufsize=0) override
Definition TPPG.h:147
static bool WriteToRoot(TFile *fileptr=nullptr)
Definition TRunInfo.cxx:309
static double RunLength()
Definition TRunInfo.h:243
static TPPG * Get(bool verbose=false)
Definition TSingleton.h:33