EICrecon
JANA based reconstruction for the EPIC detector
Loading...
Searching...
No Matches
JEventProcessorManagedPODIO.h
Go to the documentation of this file.
1#pragma once
2
3#include <nlohmann/json.hpp>
4#include <nlohmann/json_fwd.hpp>
5#include <zmq.hpp>
6#include <atomic>
7#include <condition_variable>
8#include <cstddef>
9#include <memory>
10#include <mutex>
11#include <optional>
12#include <string>
13#include <thread>
14
16
18
19public:
22
23 void Init() override;
24 void Process(const std::shared_ptr<const JEvent>& event) override;
25 void Finish() override;
26
27private:
28 void ListenForMessages();
29 void ProcessFileRequest(const nlohmann::json& request);
30 void SendResponse(const nlohmann::json& response); // listener thread only
31 void QueueResponse(const nlohmann::json& response); // any thread; wakes listener
32 void OpenOutputFile(const std::string& output_file);
33 nlohmann::json CloseOutputFile();
34 void NotifySourceNewFile(const std::string& input_file);
35 bool IsCurrentFileComplete();
36 std::size_t GetNeventsInCurrentFile();
37
38 // ZeroMQ components
39 std::unique_ptr<zmq::context_t> m_zmq_context;
40 std::unique_ptr<zmq::socket_t> m_zmq_socket;
41 std::string m_socket_path = "/tmp/eicrecon_managed.sock";
42
43 std::unique_ptr<std::thread> m_listener_thread;
44 std::atomic<bool> m_should_stop{false};
45
46 // File management (protected by m_file_mutex)
47 std::string m_current_input_file;
48 std::string m_current_output_file;
49 bool m_file_processing_active = false;
50 std::mutex m_file_mutex;
51 // Event counting for current file
52 std::atomic<std::size_t> m_events_processed{0};
53
54 // Response queue (protected by m_file_mutex; m_response_cv wakes the listener)
55 std::optional<nlohmann::json> m_queued_response;
56 std::condition_variable m_response_cv;
57
58 // True between the listener's recv() and its matching send() (ZMQ_REP protocol)
59 bool m_awaiting_reply = false;
60};
Definition JEventProcessorManagedPODIO.h:17
virtual ~JEventProcessorManagedPODIO()
Definition JEventProcessorManagedPODIO.cc:40
void Init() override
Definition JEventProcessorManagedPODIO.cc:47
void Process(const std::shared_ptr< const JEvent > &event) override
Definition JEventProcessorManagedPODIO.cc:306
void Finish() override
Definition JEventProcessorManagedPODIO.cc:336
JEventProcessorManagedPODIO()
Definition JEventProcessorManagedPODIO.cc:33
Definition JEventProcessorPODIO.h:14