Skip to content

src/processing/processing_node.hpp

Namespaces

Name
sgns
sgns::processing

Classes

Name
class sgns::processing::ProcessingNode
Node for distributed computation.

Source code

#ifndef GRPC_FOR_SUPERGENIUS_PROCESSING_NODE
#define GRPC_FOR_SUPERGENIUS_PROCESSING_NODE

#include <chrono>
#include <thread>
#include <optional>
#include <ipfs_pubsub/gossip_pubsub_topic.hpp>

#include "processing/processing_engine.hpp"
#include "processing/processing_subtask_queue_manager.hpp"
#include "processing/processing_subtask_queue_accessor.hpp"
#include "processing/processing_subtask_result_storage.hpp"

namespace sgns::processing
{
    class ProcessingNode : public std::enable_shared_from_this<ProcessingNode>
    {
    public:
        static std::shared_ptr<ProcessingNode> New(
            std::shared_ptr<ipfs_pubsub::GossipPubSub>              gossipPubSub,
            std::shared_ptr<SubTaskResultStorage>                   subTaskResultStorage,
            std::shared_ptr<ProcessingCore>                         processingCore,
            std::function<void( const SGProcessing::TaskResult & )> taskResultProcessingSink,
            std::function<void( const std::string & )>              processingErrorSink,
            std::function<void( void )>                             processingDoneSink,
            std::string                                             node_id,
            const std::string                                      &processingQueueChannelId,
            std::list<SGProcessing::SubTask>                        subTasks = {},
            std::chrono::milliseconds msSubscriptionWaitingDuration          = std::chrono::milliseconds( 2000 ),
            std::chrono::seconds      ttl                                    = std::chrono::minutes( 2 ) );

        ~ProcessingNode();

        bool HasQueueOwnership() const;

        float GetProgress() const;

    private:
        ProcessingNode( std::shared_ptr<ipfs_pubsub::GossipPubSub>              gossipPubSub,
                        std::shared_ptr<SubTaskResultStorage>                   subTaskResultStorage,
                        std::shared_ptr<ProcessingCore>                         processingCore,
                        std::function<void( const SGProcessing::TaskResult & )> taskResultProcessingSink,
                        std::function<void( const std::string & )>              processingErrorSink,
                        std::function<void( void )>                             processingDoneSink,
                        std::string                                             node_id,
                        std::chrono::seconds                                    ttl );

        bool AttachTo( const std::string &processingQueueChannelId );
        bool CreateSubTaskQueue( std::list<SGProcessing::SubTask> subTasks );
        void Initialize( const std::string        &processingQueueChannelId,
                         std::chrono::milliseconds msSubscriptionWaitingDuration );

        void InitTTL();
        void StartTTLTimer();
        void CheckTTL( const std::error_code &ec );

        std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> m_gossipPubSub;

        std::string                           m_nodeId;
        std::string                           m_escrowId;
        std::shared_ptr<ProcessingCore>       m_processingCore;
        std::shared_ptr<SubTaskResultStorage> m_subTaskResultStorage;

        std::shared_ptr<ProcessingEngine>                       m_processingEngine;
        std::shared_ptr<ProcessingSubTaskQueueChannel>          m_queueChannel;
        std::shared_ptr<ProcessingSubTaskQueueManager>          m_subtaskQueueManager;
        std::shared_ptr<SubTaskQueueAccessor>                   m_subTaskQueueAccessor;
        std::function<void( const SGProcessing::TaskResult & )> m_taskResultProcessingSink;
        std::function<void( const std::string & )>              m_processingErrorSink;
        std::function<void( void )>                             m_processingDoneSink;

        std::chrono::steady_clock::time_point                  m_creationTime;
        std::chrono::seconds                                   m_ttl;
        std::unique_ptr<boost::asio::steady_timer>             m_ttlTimer;
        std::function<void( std::shared_ptr<ProcessingNode> )> m_selfDestructCallback;

        std::shared_ptr<boost::asio::io_context> m_localContext;
        using WorkGuard = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
        std::optional<WorkGuard> m_localWorkGuard;
        std::thread              m_localIoThread;

        base::Logger m_logger = base::createLogger( "ProcessingNode" );
    };
}

#endif // GRPC_FOR_SUPERGENIUS_PROCESSING_NODE

Updated on 2026-03-04 at 13:10:44 -0800