src/processing/processing_node.hpp¶
Namespaces¶
| Name |
|---|
| sgns |
| sgns::processing |
Classes¶
| Name | |
|---|---|
| class | sgns::processing::ProcessingNode Node for distributed computation. |
Source code¶
#ifndef GRPC_FOR_SUPERGENIUS_PROCESSING_NODE
#define GRPC_FOR_SUPERGENIUS_PROCESSING_NODE
#include <chrono>
#include <thread>
#include <optional>
#include <ipfs_pubsub/gossip_pubsub_topic.hpp>
#include "processing/processing_engine.hpp"
#include "processing/processing_subtask_queue_manager.hpp"
#include "processing/processing_subtask_queue_accessor.hpp"
#include "processing/processing_subtask_result_storage.hpp"
namespace sgns::processing
{
class ProcessingNode : public std::enable_shared_from_this<ProcessingNode>
{
public:
static std::shared_ptr<ProcessingNode> New(
std::shared_ptr<ipfs_pubsub::GossipPubSub> gossipPubSub,
std::shared_ptr<SubTaskResultStorage> subTaskResultStorage,
std::shared_ptr<ProcessingCore> processingCore,
std::function<void( const SGProcessing::TaskResult & )> taskResultProcessingSink,
std::function<void( const std::string & )> processingErrorSink,
std::function<void( void )> processingDoneSink,
std::string node_id,
const std::string &processingQueueChannelId,
std::list<SGProcessing::SubTask> subTasks = {},
std::chrono::milliseconds msSubscriptionWaitingDuration = std::chrono::milliseconds( 2000 ),
std::chrono::seconds ttl = std::chrono::minutes( 2 ) );
~ProcessingNode();
bool HasQueueOwnership() const;
float GetProgress() const;
private:
ProcessingNode( std::shared_ptr<ipfs_pubsub::GossipPubSub> gossipPubSub,
std::shared_ptr<SubTaskResultStorage> subTaskResultStorage,
std::shared_ptr<ProcessingCore> processingCore,
std::function<void( const SGProcessing::TaskResult & )> taskResultProcessingSink,
std::function<void( const std::string & )> processingErrorSink,
std::function<void( void )> processingDoneSink,
std::string node_id,
std::chrono::seconds ttl );
bool AttachTo( const std::string &processingQueueChannelId );
bool CreateSubTaskQueue( std::list<SGProcessing::SubTask> subTasks );
void Initialize( const std::string &processingQueueChannelId,
std::chrono::milliseconds msSubscriptionWaitingDuration );
void InitTTL();
void StartTTLTimer();
void CheckTTL( const std::error_code &ec );
std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> m_gossipPubSub;
std::string m_nodeId;
std::string m_escrowId;
std::shared_ptr<ProcessingCore> m_processingCore;
std::shared_ptr<SubTaskResultStorage> m_subTaskResultStorage;
std::shared_ptr<ProcessingEngine> m_processingEngine;
std::shared_ptr<ProcessingSubTaskQueueChannel> m_queueChannel;
std::shared_ptr<ProcessingSubTaskQueueManager> m_subtaskQueueManager;
std::shared_ptr<SubTaskQueueAccessor> m_subTaskQueueAccessor;
std::function<void( const SGProcessing::TaskResult & )> m_taskResultProcessingSink;
std::function<void( const std::string & )> m_processingErrorSink;
std::function<void( void )> m_processingDoneSink;
std::chrono::steady_clock::time_point m_creationTime;
std::chrono::seconds m_ttl;
std::unique_ptr<boost::asio::steady_timer> m_ttlTimer;
std::function<void( std::shared_ptr<ProcessingNode> )> m_selfDestructCallback;
std::shared_ptr<boost::asio::io_context> m_localContext;
using WorkGuard = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
std::optional<WorkGuard> m_localWorkGuard;
std::thread m_localIoThread;
base::Logger m_logger = base::createLogger( "ProcessingNode" );
};
}
#endif // GRPC_FOR_SUPERGENIUS_PROCESSING_NODE
Updated on 2026-03-04 at 13:10:44 -0800