Skip to content

src/processing/processing_subtask_queue_channel_pubsub.hpp

Namespaces

Name
sgns
sgns::processing

Classes

Name
class sgns::processing::ProcessingSubTaskQueueChannelPubSub

Source code

#ifndef SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_CHANNEL_PUBSUB_HPP
#define SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_CHANNEL_PUBSUB_HPP

#include <future>
#include "outcome/outcome.hpp"

#include "processing/processing_subtask_queue_channel.hpp"

#include <ipfs_pubsub/gossip_pubsub_topic.hpp>
#include "base/logger.hpp"

using namespace sgns::ipfs_pubsub;

namespace sgns::processing
{
    class ProcessingSubTaskQueueChannelPubSub : public ProcessingSubTaskQueueChannel,
                                                public std::enable_shared_from_this<ProcessingSubTaskQueueChannelPubSub>
    {
    public:
        using QueueRequestSink = std::function<bool( const SGProcessing::SubTaskQueueRequest & )>;
        using QueueUpdateSink  = std::function<bool( SGProcessing::SubTaskQueue * )>;

        ProcessingSubTaskQueueChannelPubSub( std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> gossipPubSub,
                                             const std::string &processingQueueChannelId );

        ~ProcessingSubTaskQueueChannelPubSub() override;

        void RequestQueueOwnership( const std::string &nodeId ) override;
        void PublishQueue( std::shared_ptr<SGProcessing::SubTaskQueue> queue ) override;

        void SetQueueRequestSink( QueueRequestSink queueRequestSink );

        void SetQueueUpdateSink( QueueUpdateSink queueUpdateSink );

        outcome::result<std::variant<std::chrono::milliseconds, std::shared_future<std::shared_ptr<GossipPubSubTopic::Subscription>>>> Listen(
            std::chrono::milliseconds msSubscriptionWaitingDuration = std::chrono::milliseconds(2000));

        size_t GetActiveNodesCount() const override;

        std::vector<libp2p::peer::PeerId> GetActiveNodes() const override;


    private:
        std::shared_ptr<sgns::ipfs_pubsub::GossipPubSubTopic> m_processingQueueChannel;

        void OnProcessingChannelMessage( boost::optional<const sgns::ipfs_pubsub::GossipPubSub::Message &> message );

        void HandleSubTaskQueueRequest( SGProcessing::ProcessingChannelMessage &channelMesssage );
        void HandleSubTaskQueue( SGProcessing::ProcessingChannelMessage &channelMesssage );

        std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> m_gossipPubSub;
        std::shared_ptr<boost::asio::io_context>         m_context;

        std::function<bool( const SGProcessing::SubTaskQueueRequest & )> m_queueRequestSink;
        std::function<bool( SGProcessing::SubTaskQueue * )>              m_queueUpdateSink;

        base::Logger m_logger = base::createLogger( "ProcessingSubTaskQueueChannelPubSub" );


    };
}
#endif // SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_CHANNEL_PUBSUB_HPP

Updated on 2026-03-04 at 13:10:44 -0800