GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TFragmentChainLoop.cxx
Go to the documentation of this file.
2
3#include <chrono>
4#include <thread>
5
6#include "TClass.h"
7#include "TFile.h"
8#include "TThread.h"
9
10#include "TDetector.h"
11#include "TGRSIint.h"
12#include "TFragment.h"
13
14TFragmentChainLoop* TFragmentChainLoop::Get(std::string name, TChain* chain)
15{
16 if(name.length() == 0) {
17 name = "chain_loop";
18 }
19
20 auto* loop = static_cast<TFragmentChainLoop*>(StoppableThread::Get(name));
21 if(loop == nullptr) {
22 if((chain == nullptr) && (gFragment == nullptr)) {
23 return nullptr;
24 }
25 if(chain == nullptr) {
26 chain = gFragment;
27 }
28 loop = new TFragmentChainLoop(name, chain);
29 }
30 return loop;
31}
32
33TFragmentChainLoop::TFragmentChainLoop(std::string name, TChain* chain)
34 : StoppableThread(std::move(name)), fEntriesTotal(chain->GetEntries()),
35 fInputChain(chain), fFragment(nullptr), fSelfStopping(true)
36{
37 SetupChain();
38}
39
41
43{
44 for(const auto& outQueue : fOutputQueues) {
45 while(outQueue->Size() != 0u) {
46 std::shared_ptr<const TFragment> event;
47 outQueue->Pop(event);
48 }
49 }
50}
51
53{
54 if(fInputChain == nullptr) {
55 return 0;
56 }
57
58 fInputChain->SetBranchAddress("TFragment", &fFragment);
59 return 0;
60}
61
66
68{
69 for(const auto& outQueue : fOutputQueues) {
70 outQueue->SetFinished();
71 }
72}
73
75{
76 if(static_cast<int64_t>(ItemsPopped()) >= fEntriesTotal) {
77 if(fSelfStopping) {
78 return false;
79 }
80 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
81 return true;
82 }
83
84 std::shared_ptr<TFragment> frag = std::make_shared<TFragment>();
85 fInputChain->GetEntry(ItemsPopped());
87 *frag = *fFragment;
88 frag->SetEntryNumber();
89 for(const auto& outQueue : fOutputQueues) {
90 outQueue->Push(frag);
91 }
92 InputSize(fEntriesTotal - ItemsPopped()); // this way fInputSize+fItemsPopped gives the total number of entries
93
94 return true;
95}
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
void IncrementItemsPopped()
std::vector< std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > > fOutputQueues
void ClearQueue() override
static TFragmentChainLoop * Get(std::string name="", TChain *chain=nullptr)
TFragmentChainLoop(const TFragmentChainLoop &)=delete
bool Iteration() override
TChain * gFragment
Definition TGRSIint.cxx:50