Skip to content

sgns::processing::ProcessingSubTaskQueueChannelPubSub

More...

#include <processing_subtask_queue_channel_pubsub.hpp>

Inherits from sgns::processing::ProcessingSubTaskQueueChannel, std::enable_shared_from_this< ProcessingSubTaskQueueChannelPubSub >

Public Types

Name
using std::function< bool(const SGProcessing::SubTaskQueueRequest &)> QueueRequestSink
using std::function< bool(SGProcessing::SubTaskQueue *)> QueueUpdateSink

Public Functions

Name
ProcessingSubTaskQueueChannelPubSub(std::shared_ptr< sgns::ipfs_pubsub::GossipPubSub > gossipPubSub, const std::string & processingQueueChannelId)
~ProcessingSubTaskQueueChannelPubSub() override
virtual void RequestQueueOwnership(const std::string & nodeId) override
virtual void PublishQueue(std::shared_ptr< SGProcessing::SubTaskQueue > queue) override
void SetQueueRequestSink(QueueRequestSink queueRequestSink)
void SetQueueUpdateSink(QueueUpdateSink queueUpdateSink)
outcome::result< std::variant< std::chrono::milliseconds, std::shared_future< std::shared_ptr< GossipPubSubTopic::Subscription > > > > Listen(std::chrono::milliseconds msSubscriptionWaitingDuration =std::chrono::milliseconds(2000))
virtual size_t GetActiveNodesCount() const override
virtual std::vector< libp2p::peer::PeerId > GetActiveNodes() const override

Additional inherited members

Public Functions inherited from sgns::processing::ProcessingSubTaskQueueChannel

Name
virtual ~ProcessingSubTaskQueueChannel() =default

Detailed Description

class sgns::processing::ProcessingSubTaskQueueChannelPubSub;

Subtask queue channel implementation that uses pubsub as a data transport protocol

Public Types Documentation

using QueueRequestSink

using sgns::processing::ProcessingSubTaskQueueChannelPubSub::QueueRequestSink = std::function<bool( const SGProcessing::SubTaskQueueRequest & )>;

using QueueUpdateSink

using sgns::processing::ProcessingSubTaskQueueChannelPubSub::QueueUpdateSink = std::function<bool( SGProcessing::SubTaskQueue * )>;

Public Functions Documentation

function ProcessingSubTaskQueueChannelPubSub

ProcessingSubTaskQueueChannelPubSub(
    std::shared_ptr< sgns::ipfs_pubsub::GossipPubSub > gossipPubSub,
    const std::string & processingQueueChannelId
)

Parameters:

  • gossipPubSub - ipfs pubsub
  • processingQueueChannelId - a unique id of queue data channel

Constructs subtask queue channel object

function ~ProcessingSubTaskQueueChannelPubSub

~ProcessingSubTaskQueueChannelPubSub() override

function RequestQueueOwnership

virtual void RequestQueueOwnership(
    const std::string & nodeId
) override

Reimplements: sgns::processing::ProcessingSubTaskQueueChannel::RequestQueueOwnership

ProcessingSubTaskQueueChannel overrides

function PublishQueue

virtual void PublishQueue(
    std::shared_ptr< SGProcessing::SubTaskQueue > queue
) override

Reimplements: sgns::processing::ProcessingSubTaskQueueChannel::PublishQueue

Publishes queue to all queue consumers queue = subtask queue

function SetQueueRequestSink

void SetQueueRequestSink(
    QueueRequestSink queueRequestSink
)

Parameters:

  • queueRequestSink - request handler

Sets a handler for remote queue requests processing

function SetQueueUpdateSink

void SetQueueUpdateSink(
    QueueUpdateSink queueUpdateSink
)

Sets a handler for remote queue updates processing

function Listen

outcome::result< std::variant< std::chrono::milliseconds, std::shared_future< std::shared_ptr< GossipPubSubTopic::Subscription > > > > Listen(
    std::chrono::milliseconds msSubscriptionWaitingDuration =std::chrono::milliseconds(2000)
)

Parameters:

  • msSubscriptionWaitingDuration - Duration to wait for subscription, 0 means no waiting

Return: If msSubscriptionWaitingDuration > 0: outcome with success/failure and actual wait time If msSubscriptionWaitingDuration = 0: outcome with future that completes when subscription is established

Starts a listening to pubsub channel

function GetActiveNodesCount

virtual size_t GetActiveNodesCount() const override

Return: The number of active nodes currently participating in the channel.

Reimplements: sgns::processing::ProcessingSubTaskQueueChannel::GetActiveNodesCount

Retrieves the count of active nodes in the subtask queue channel.

function GetActiveNodes

virtual std::vector< libp2p::peer::PeerId > GetActiveNodes() const override

Return: A vector of strings containing the IDs of active nodes in the channel.

Reimplements: sgns::processing::ProcessingSubTaskQueueChannel::GetActiveNodes

Retrieves the list of active node IDs currently participating in the subtask queue channel.


Updated on 2026-04-15 at 11:00:39 -0700