Skip to content

src/processing/processing_subtask_queue_accessor_impl.hpp

Namespaces

Name
sgns
sgns::processing

Classes

Name
class sgns::processing::SubTaskQueueAccessorImpl
Subtask queue accessor implementation.

Source code

#ifndef SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_ACCESSOR_IMPL_HPP
#define SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_ACCESSOR_IMPL_HPP

#include "processing/processing_subtask_queue_accessor.hpp"
#include "processing/processing_subtask_queue_manager.hpp"
#include "processing/processing_subtask_result_storage.hpp"
#include "processing/processing_validation_core.hpp"

#include <ipfs_pubsub/gossip_pubsub_topic.hpp>
#include <list>
#include <optional>
#include <thread>
#include <boost/asio.hpp>

namespace sgns::processing
{
    class SubTaskQueueAccessorImpl : public SubTaskQueueAccessor,
                                     public std::enable_shared_from_this<SubTaskQueueAccessorImpl>
    {
    public:
        SubTaskQueueAccessorImpl( std::shared_ptr<ipfs_pubsub::GossipPubSub>              gossipPubSub,
                                  std::shared_ptr<ProcessingSubTaskQueueManager>          subTaskQueueManager,
                                  std::shared_ptr<SubTaskResultStorage>                   subTaskResultStorage,
                                  std::function<void( const SGProcessing::TaskResult & )> taskResultProcessingSink,
                                  std::function<void( const std::string & )>              processingErrorSink );
        ~SubTaskQueueAccessorImpl() override;

        bool ConnectToSubTaskQueue( std::function<void()> onSubTaskQueueConnectedEventSink ) override;
        bool AssignSubTasks( std::list<SGProcessing::SubTask> &subTasks ) override;
        void GrabSubTask( SubTaskGrabbedCallback onSubTaskGrabbedCallback ) override;
        void CompleteSubTask( const std::string &subTaskId, const SGProcessing::SubTaskResult &subTaskResult ) override;
        bool CreateResultsChannel( const std::string &task_id ) override;

        std::vector<std::tuple<std::string, SGProcessing::SubTaskResult>> GetResults() const;

        enum class FinalizationRetVal
        {
            NOT_FINALIZED           = 0,
            FINALIZED               = 1,
            FINALIZED_BUT_NOT_OWNER = 2,
        };

    private:
        bool               OnResultReceived( SGProcessing::SubTaskResult &&subTaskResult );
        void               OnSubTaskQueueAssigned( const std::vector<std::string> &subTaskIds,
                                                   std::function<void()>           onSubTaskQueueConnectedEventSink );
        void               UpdateResultsFromStorage( const std::set<std::string> &subTaskIds );
        FinalizationRetVal FinalizeQueueProcessing( const SGProcessing::SubTaskCollection &subTasks,
                                                    std::set<std::string>                 &invalidSubTaskIds );

        static void OnResultChannelMessage( std::weak_ptr<SubTaskQueueAccessorImpl>                     weakThis,
                                            boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message );
        void        StartPeriodicStateBroadcast();
        void        ScheduleStateBroadcast();
        void        PublishExistingResults();
        // Helper method to find a subtask by ID
        boost::optional<SGProcessing::SubTask> FindSubTaskById( const std::string &subTaskId ) const;

        std::shared_ptr<ipfs_pubsub::GossipPubSub>              m_gossipPubSub;
        std::shared_ptr<ProcessingSubTaskQueueManager>          m_subTaskQueueManager;
        std::shared_ptr<SubTaskResultStorage>                   m_subTaskResultStorage;
        std::function<void( const SGProcessing::TaskResult & )> m_taskResultProcessingSink;
        std::function<void( const std::string & )>              m_processingErrorSink;
        std::shared_ptr<boost::asio::io_context>                m_localContext;
        using WorkGuard = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
        std::optional<WorkGuard>                   m_localWorkGuard;
        std::thread                                m_localThread;
        std::shared_ptr<boost::asio::steady_timer> m_stateTimer;

        std::shared_ptr<sgns::ipfs_pubsub::GossipPubSubTopic> m_resultChannel;

        mutable std::mutex                                 m_mutexResults;
        std::map<std::string, SGProcessing::SubTaskResult> m_results;
        ProcessingValidationCore                           m_validationCore;

        base::Logger m_logger = base::createLogger( "ProcessingSubTaskQueueAccessorImpl" );
    };
}

#endif

Updated on 2026-03-04 at 13:10:44 -0800