GRSISort "v4.0.0.5"
An extension of the ROOT analysis Framework
Loading...
Searching...
No Matches
TFragmentChainLoop.cxx
Go to the documentation of this file.
2
3#include <chrono>
4#include <thread>
5
6#include "TGRSIint.h"
7#include "TFragment.h"
8
9TFragmentChainLoop* TFragmentChainLoop::Get(std::string name, TChain* chain)
10{
11 if(name.length() == 0) {
12 name = "chain_loop";
13 }
14
15 auto* loop = static_cast<TFragmentChainLoop*>(StoppableThread::Get(name));
16 if(loop == nullptr) {
17 if((chain == nullptr) && (gFragment == nullptr)) {
18 return nullptr;
19 }
20 if(chain == nullptr) {
21 chain = gFragment;
22 }
23 loop = new TFragmentChainLoop(name, chain);
24 }
25 return loop;
26}
27
28TFragmentChainLoop::TFragmentChainLoop(std::string name, TChain* chain)
29 : StoppableThread(std::move(name)), fEntriesTotal(chain->GetEntries()),
30 fInputChain(chain), fFragment(nullptr), fSelfStopping(true)
31{
32 SetupChain();
33}
34
36
38{
39 for(const auto& outQueue : fOutputQueues) {
40 while(outQueue->Size() != 0u) {
41 std::shared_ptr<const TFragment> event;
42 outQueue->Pop(event);
43 }
44 }
45}
46
48{
49 if(fInputChain == nullptr) {
50 return 0;
51 }
52
53 fInputChain->SetBranchAddress("TFragment", &fFragment);
54 return 0;
55}
56
61
63{
64 for(const auto& outQueue : fOutputQueues) {
65 outQueue->SetFinished();
66 }
67}
68
70{
71 if(static_cast<int64_t>(ItemsPopped()) >= fEntriesTotal) {
72 if(fSelfStopping) {
73 return false;
74 }
75 std::this_thread::sleep_for(std::chrono::milliseconds(1000));
76 return true;
77 }
78
79 std::shared_ptr<TFragment> frag = std::make_shared<TFragment>();
80 fInputChain->GetEntry(ItemsPopped());
82 *frag = *fFragment;
83 frag->SetEntryNumber();
84 for(const auto& outQueue : fOutputQueues) {
85 outQueue->Push(frag);
86 }
87 InputSize(fEntriesTotal - ItemsPopped()); // this way fInputSize+fItemsPopped gives the total number of entries
88
89 return true;
90}
std::atomic_size_t & ItemsPopped()
std::atomic_long & InputSize()
static StoppableThread * Get(const std::string &name)
void IncrementItemsPopped()
std::vector< std::shared_ptr< ThreadsafeQueue< std::shared_ptr< const TFragment > > > > fOutputQueues
void ClearQueue() override
static TFragmentChainLoop * Get(std::string name="", TChain *chain=nullptr)
TFragmentChainLoop(const TFragmentChainLoop &)=delete
bool Iteration() override
TChain * gFragment
Definition TGRSIint.cxx:50