src/processing/processing_subtask_queue_channel_pubsub.cpp¶
Namespaces¶
| Name |
|---|
| sgns |
| sgns::processing |
Source code¶
#include "processing_subtask_queue_channel_pubsub.hpp"
#include <base/util.hpp>
#include "base/sgns_version.hpp"
namespace sgns::processing
{
ProcessingSubTaskQueueChannelPubSub::ProcessingSubTaskQueueChannelPubSub(
std::shared_ptr<GossipPubSub> gossipPubSub,
const std::string &processingQueueChannelId ) :
m_gossipPubSub( std::move( gossipPubSub ) )
{
auto processing_queue_topic = processingQueueChannelId + sgns::version::GetNetAndVersionAppendix();
m_processingQueueChannel = std::make_shared<GossipPubSubTopic>( m_gossipPubSub, processing_queue_topic );
}
ProcessingSubTaskQueueChannelPubSub::~ProcessingSubTaskQueueChannelPubSub()
{
m_logger->debug( "[RELEASED] this: {}", reinterpret_cast<size_t>( this ) );
}
outcome::result<
std::variant<std::chrono::milliseconds, std::shared_future<std::shared_ptr<GossipPubSubTopic::Subscription>>>>
ProcessingSubTaskQueueChannelPubSub::Listen( std::chrono::milliseconds msSubscriptionWaitingDuration )
{
// Subscribe to the processing queue channel
auto subscription_future = m_processingQueueChannel->Subscribe(
[weakSelf = weak_from_this()]( boost::optional<const GossipPubSub::Message &> message )
{
if ( auto self = weakSelf.lock() )
{
self->OnProcessingChannelMessage( message );
}
},
msSubscriptionWaitingDuration.count() == 0 // If waiting duration is 0, subscribe now
);
if ( msSubscriptionWaitingDuration.count() > 0 )
{
// If a waiting duration is provided, wait for the subscription to complete
std::chrono::milliseconds resultTime;
bool success = waitForCondition(
[&subscription_future]()
{ return subscription_future.wait_for( std::chrono::seconds( 0 ) ) == std::future_status::ready; },
msSubscriptionWaitingDuration,
&resultTime );
if ( success )
{
m_logger->debug( "Subscription established after {} ms", resultTime.count() );
// Fixed: Use consistent type (GossipPubSubTopic::Subscription)
return std::variant<std::chrono::milliseconds,
std::shared_future<std::shared_ptr<GossipPubSubTopic::Subscription>>>( resultTime );
}
m_logger->error( "Subscription not established within the specified time ({} ms)",
msSubscriptionWaitingDuration.count() );
return outcome::failure( boost::system::errc::timed_out );
}
// If no waiting requested, return the future
// Fixed: Use std::move for efficiency (though not strictly required for shared_future)
return std::variant<std::chrono::milliseconds,
std::shared_future<std::shared_ptr<GossipPubSubTopic::Subscription>>>(
std::move( subscription_future ) );
}
void ProcessingSubTaskQueueChannelPubSub::RequestQueueOwnership( const std::string &nodeId )
{
// Send a request to grab a subtask queue
SGProcessing::ProcessingChannelMessage message;
message.mutable_subtask_queue_request()->set_node_id( nodeId );
m_processingQueueChannel->Publish( message.SerializeAsString() );
}
void ProcessingSubTaskQueueChannelPubSub::PublishQueue( std::shared_ptr<SGProcessing::SubTaskQueue> queue )
{
SGProcessing::ProcessingChannelMessage message;
message.set_allocated_subtask_queue( queue.get() );
m_processingQueueChannel->Publish( message.SerializeAsString() );
message.release_subtask_queue();
}
void ProcessingSubTaskQueueChannelPubSub::SetQueueRequestSink( QueueRequestSink queueRequestSink )
{
m_queueRequestSink = std::move( queueRequestSink );
}
void ProcessingSubTaskQueueChannelPubSub::SetQueueUpdateSink( QueueUpdateSink queueUpdateSink )
{
m_queueUpdateSink = std::move( queueUpdateSink );
}
void ProcessingSubTaskQueueChannelPubSub::OnProcessingChannelMessage(
boost::optional<const GossipPubSub::Message &> message )
{
if ( message )
{
SGProcessing::ProcessingChannelMessage channelMesssage;
if ( channelMesssage.ParseFromArray( message->data.data(), static_cast<int>( message->data.size() ) ) )
{
if ( channelMesssage.has_subtask_queue_request() )
{
HandleSubTaskQueueRequest( channelMesssage );
}
else if ( channelMesssage.has_subtask_queue() )
{
HandleSubTaskQueue( channelMesssage );
}
}
}
}
void ProcessingSubTaskQueueChannelPubSub::HandleSubTaskQueueRequest(
SGProcessing::ProcessingChannelMessage &channelMesssage )
{
if ( m_queueRequestSink )
{
auto message = channelMesssage.subtask_queue_request();
if ( !m_queueRequestSink( message ) )
{
m_logger->debug( "Queue request is pending for node {}", message.node_id() );
}
else
{
m_logger->debug( "Queue request was immediately fulfilled for node {}", message.node_id() );
}
}
}
void ProcessingSubTaskQueueChannelPubSub::HandleSubTaskQueue(
SGProcessing::ProcessingChannelMessage &channelMesssage )
{
auto message = channelMesssage.release_subtask_queue();
if ( m_queueUpdateSink )
{
auto queueChanged = m_queueUpdateSink( message );
m_logger->debug( "Queue changed = {} during release for node", queueChanged );
}
}
size_t ProcessingSubTaskQueueChannelPubSub::GetActiveNodesCount() const
{
// include ourselves
return m_processingQueueChannel->getPeerCount() + 1;
}
std::vector<libp2p::peer::PeerId> ProcessingSubTaskQueueChannelPubSub::GetActiveNodes() const
{
return m_processingQueueChannel->getAllPeers();
}
}
Updated on 2026-03-04 at 13:10:44 -0800