Skip to content

src/processing/processing_service.hpp

Namespaces

Name
sgns
sgns::processing

Classes

Name
class sgns::processing::ProcessingServiceImpl
struct sgns::processing::ProcessingServiceImpl::ProcessingStatus

Source code

#ifndef GRPC_FOR_SUPERGENIUS_PROCESSING_SERVICE
#define GRPC_FOR_SUPERGENIUS_PROCESSING_SERVICE

#include <unordered_map>

#include "processing/processing_node.hpp"
#include "processing/processing_subtask_enqueuer.hpp"

namespace sgns::processing
{
    class ProcessingServiceImpl : public std::enable_shared_from_this<ProcessingServiceImpl>
    {
    public:
        enum class Status {
            DISABLED,
            IDLE,
            PROCESSING,
        };

        struct ProcessingStatus {
            Status status;
            float percentage; // 0.0 to 100.0

            ProcessingStatus(Status s = Status::DISABLED, float p = 0.0f) 
                : status(s), percentage(p) {}
        };

        ProcessingServiceImpl( std::shared_ptr<ipfs_pubsub::GossipPubSub> gossipPubSub,
                               size_t                                     maximalNodesCount,
                               std::shared_ptr<SubTaskEnqueuer>           subTaskEnqueuer,
                               std::shared_ptr<SubTaskResultStorage>      subTaskResultStorage,
                               std::shared_ptr<ProcessingCore>            processingCore );

        ProcessingServiceImpl( std::shared_ptr<ipfs_pubsub::GossipPubSub>                        gossipPubSub,
                               size_t                                                            maximalNodesCount,
                               std::shared_ptr<SubTaskEnqueuer>                                  subTaskEnqueuer,
                               std::shared_ptr<SubTaskResultStorage>                             subTaskResultStorage,
                               std::shared_ptr<ProcessingCore>                                   processingCore,
                               std::function<void( const std::string              &subTaskQueueId,
                                                   const SGProcessing::TaskResult &taskresult )> userCallbackSuccess,
                               std::function<void( const std::string &subTaskQueueId )>          userCallbackError,
                               std::string                                                       node_address );

        ~ProcessingServiceImpl();

        void StartProcessing( const std::string &processingGridChannelId );

        void StopProcessing();

        size_t GetProcessingNodesCount() const;

        void SetChannelListRequestTimeout( boost::posix_time::time_duration channelListRequestTimeout );

        [[nodiscard]] ProcessingStatus GetProcessingStatus() const;

    private:
        void Listen( const std::string &processingGridChannelId );

        void SendChannelListRequest();

        void OnMessage( boost::optional<const sgns::ipfs_pubsub::GossipPubSub::Message &> message );
        void OnQueueProcessingCompleted( const std::string              &subTaskQueueId,
                                         const SGProcessing::TaskResult &taskResult );
        void OnProcessingError( const std::string &subTaskQueueId, const std::string &errorMessage );
        void OnProcessingDone( const std::string &taskId );

        void AcceptProcessingChannel( const std::string &channelId );

        void PublishLocalChannelList();

        void HandleRequestTimeout();

        void BroadcastNodeCreationIntent( const std::string &subTaskQueueId );
        void HandleNodeCreationTimeout();
        void OnNodeCreationIntent( const std::string &peerAddress, const std::string &subTaskQueueId );
        bool HasLowestAddress() const;
        bool IsPendingCreationStale() const;
        void CancelPendingCreation( const std::string &reason );

        std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub>       m_gossipPubSub;
        std::shared_ptr<boost::asio::io_context>               m_context;
        std::unique_ptr<boost::asio::io_context::work>         m_context_work; // Keep context alive
        std::thread                                            io_thread;
        size_t                                                 m_maximalNodesCount;
        std::shared_ptr<SubTaskEnqueuer>                       m_subTaskEnqueuer;
        std::shared_ptr<SubTaskResultStorage>                  m_subTaskResultStorage;
        std::shared_ptr<ProcessingCore>                        m_processingCore;
        std::unique_ptr<sgns::ipfs_pubsub::GossipPubSubTopic>  m_gridChannel;
        std::unordered_map<std::string, std::shared_ptr<ProcessingNode>> m_processingNodes;
        boost::asio::deadline_timer                            m_timerChannelListRequestTimeout;
        boost::posix_time::time_duration                       m_channelListRequestTimeout;
        bool                                                   m_waitingChannelRequest = false;
        std::atomic<bool>                                      m_isStopped;
        mutable std::recursive_mutex                           m_mutexNodes;
        std::function<void( const std::string &subTaskQueueId, const SGProcessing::TaskResult &taskresult )>
                                                                 userCallbackSuccess_;
        std::function<void( const std::string &subTaskQueueId )> userCallbackError_;
        std::string                                              node_address_;

        std::set<std::string>                 m_competingPeers;
        std::chrono::steady_clock::time_point m_pendingCreationTimestamp;
        std::chrono::seconds                  m_pendingCreationTimeout{ 10 };

        boost::asio::deadline_timer         m_nodeCreationTimer;
        boost::posix_time::time_duration    m_nodeCreationTimeout;
        std::string                         m_pendingSubTaskQueueId;
        std::list<SGProcessing::SubTask>    m_pendingSubTasks;
        boost::optional<SGProcessing::Task> m_pendingTask;
        std::mutex                          m_mutexPendingCreation;

        // Blacklist for failed tasks/channels to prevent repeated processing attempts
        std::set<std::string> m_blacklistedChannels;
        std::mutex            m_mutexBlacklist;

        base::Logger m_logger = base::createLogger( "ProcessingService" );
    };
}

#endif // GRPC_FOR_SUPERGENIUS_PROCESSING_SERVICE

Updated on 2026-03-04 at 13:10:44 -0800