Skip to content

src/processing/processing_service.cpp

Namespaces

Name
sgns
sgns::processing

Source code

#include "processing_service.hpp"
#include "base/sgns_version.hpp"
#include <utility>
#include <thread>

namespace sgns::processing
{
    ProcessingServiceImpl::ProcessingServiceImpl( std::shared_ptr<ipfs_pubsub::GossipPubSub> gossipPubSub,
                                                  size_t                                     maximalNodesCount,
                                                  std::shared_ptr<SubTaskEnqueuer>           subTaskEnqueuer,
                                                  std::shared_ptr<SubTaskResultStorage>      subTaskResultStorage,
                                                  std::shared_ptr<ProcessingCore>            processingCore ) :
        m_gossipPubSub( std::move( gossipPubSub ) ),
        m_context( std::make_shared<boost::asio::io_context>() ),
        m_maximalNodesCount( maximalNodesCount ),
        m_subTaskEnqueuer( std::move( subTaskEnqueuer ) ),
        m_subTaskResultStorage( std::move( subTaskResultStorage ) ),
        m_processingCore( std::move( processingCore ) ),
        m_timerChannelListRequestTimeout( *m_context ),
        m_channelListRequestTimeout( boost::posix_time::seconds( 5 ) ),
        m_isStopped( true ),
        node_address_( m_gossipPubSub->GetLocalAddress() ),
        m_nodeCreationTimer( *m_context ),
        m_nodeCreationTimeout( boost::posix_time::milliseconds( 1000 ) )
    {
    }

    ProcessingServiceImpl::ProcessingServiceImpl(
        std::shared_ptr<ipfs_pubsub::GossipPubSub> gossipPubSub,
        size_t                                     maximalNodesCount,
        std::shared_ptr<SubTaskEnqueuer>           subTaskEnqueuer,
        std::shared_ptr<SubTaskResultStorage>      subTaskResultStorage,
        std::shared_ptr<ProcessingCore>            processingCore,
        std::function<void( const std::string &subTaskQueueId, const SGProcessing::TaskResult &taskresult )>
                                                                 userCallbackSuccess,
        std::function<void( const std::string &subTaskQueueId )> userCallbackError,
        std::string                                              node_address ) :
        m_gossipPubSub( std::move( gossipPubSub ) ),
        m_context( std::make_shared<boost::asio::io_context>() ),
        m_maximalNodesCount( maximalNodesCount ),
        m_subTaskEnqueuer( std::move( subTaskEnqueuer ) ),
        m_subTaskResultStorage( std::move( subTaskResultStorage ) ),
        m_processingCore( std::move( processingCore ) ),
        m_timerChannelListRequestTimeout( *m_context ),
        m_channelListRequestTimeout( boost::posix_time::seconds( 1 ) ),
        m_isStopped( true ),
        userCallbackSuccess_( std::move( userCallbackSuccess ) ),
        userCallbackError_( std::move( userCallbackError ) ),
        node_address_( std::move( node_address ) ),
        m_nodeCreationTimer( *m_context ),
        m_nodeCreationTimeout( boost::posix_time::milliseconds( 1000 ) )
    {
    }

    ProcessingServiceImpl::~ProcessingServiceImpl()
    {
        m_logger->debug( "~ProcessingServiceImpl CALLED" );
        StopProcessing();
    }

    void ProcessingServiceImpl::StartProcessing( const std::string &processingGridChannelId )
    {
        if ( !m_isStopped )
        {
            m_logger->debug( "[{}] [SERVICE_WAS_PREVIOUSLY_STARTED]", node_address_ );
            return;
        }

        m_isStopped = false;

        // Reset the io_context and create the work object
        m_context->reset();
        m_context_work = std::make_unique<boost::asio::io_context::work>( *m_context );

        io_thread = std::thread( [this] { m_context->run(); } );

        Listen( processingGridChannelId );
        SendChannelListRequest();
        m_logger->debug( "[{}] [SERVICE_STARTED]", node_address_ );
    }

    void ProcessingServiceImpl::StopProcessing()
    {
        if ( m_isStopped )
        {
            return;
        }

        m_isStopped = true;

        if ( m_gridChannel )
        {
            m_gridChannel->Unsubscribe();
        }

        // Cancel timers before stopping context!
        boost::system::error_code ec;
        m_timerChannelListRequestTimeout.cancel( ec );
        m_nodeCreationTimer.cancel( ec );

        m_context_work.reset();

        m_context->stop();

        if ( io_thread.joinable() )
        {
            io_thread.join();
        }

        {
            std::scoped_lock lock( m_mutexNodes );
            m_processingNodes.clear();
        }

        m_logger->debug( "[{}] [SERVICE_STOPPED]", node_address_ );
    }

    void ProcessingServiceImpl::Listen( const std::string &processingGridChannelId )
    {
        using GossipPubSubTopic = ipfs_pubsub::GossipPubSubTopic;
        auto processing_topic   = processingGridChannelId + version::GetNetAndVersionAppendix();
        m_gridChannel           = std::make_unique<GossipPubSubTopic>( m_gossipPubSub, processing_topic );
        m_gridChannel->Subscribe(
            [weakSelf = weak_from_this()]( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
            {
                if ( auto self = weakSelf.lock() ) // Check if object still exists
                {
                    self->OnMessage( message );
                }
            } );
    }

    void ProcessingServiceImpl::SendChannelListRequest()
    {
        if ( m_waitingChannelRequest )
        {
            return;
        }
        m_waitingChannelRequest = true;
        SGProcessing::GridChannelMessage gridMessage;
        auto                             channelRequest = gridMessage.mutable_processing_channel_request();
        channelRequest->set_environment( "any" );

        m_gridChannel->Publish( gridMessage.SerializeAsString() );
        m_logger->debug( "List of processing channels requested" );
        m_timerChannelListRequestTimeout.expires_from_now( m_channelListRequestTimeout );
        m_timerChannelListRequestTimeout.async_wait(
            [instance = weak_from_this()]( const boost::system::error_code & )
            {
                if ( auto strong = instance.lock() )
                {
                    strong->HandleRequestTimeout();
                }
            } );
    }

    void ProcessingServiceImpl::OnMessage( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
    {
        m_logger->trace( "[{}] On Message.", node_address_ );
        if ( !message )
        {
            m_logger->trace( "[{}] Invalid message.", node_address_ );
            return;
        }

        m_logger->trace( "[{}] Valid message.", node_address_ );
        SGProcessing::GridChannelMessage gridMessage;
        if ( !gridMessage.ParseFromArray( message->data.data(), static_cast<int>( message->data.size() ) ) )
        {
            m_logger->error( "[{}] Could not deserialize message", node_address_ );
            return;
        }

        if ( gridMessage.has_processing_channel_response() )
        {
            auto response = gridMessage.processing_channel_response();
            m_logger->trace( "[{}] Processing channel received. id:{}", node_address_, response.channel_id() );
            AcceptProcessingChannel( response.channel_id() );
        }
        else if ( gridMessage.has_processing_channel_request() )
        {
            m_logger->trace( "[{}] PUBLISH.", node_address_ );
            PublishLocalChannelList();
        }
        else if ( gridMessage.has_node_creation_intent() )
        {
            // Handle intent from another peer
            auto intent = gridMessage.node_creation_intent();
            OnNodeCreationIntent( intent.peer_address(), intent.subtask_queue_id() );
        }
    }

    void ProcessingServiceImpl::OnQueueProcessingCompleted( const std::string              &subTaskQueueId,
                                                            const SGProcessing::TaskResult &taskResult )
    {
        m_logger->debug( "[{}] SUBTASK_QUEUE_PROCESSING_COMPLETED: {}", node_address_, subTaskQueueId );

        if ( userCallbackSuccess_ )
        {
            userCallbackSuccess_( subTaskQueueId, taskResult );
        }

        {
            std::scoped_lock lock( m_mutexNodes );
            m_processingNodes.erase( subTaskQueueId );
        }

        if ( !m_isStopped )
        {
            SendChannelListRequest();
        }
    }

    void ProcessingServiceImpl::OnProcessingError( const std::string &subTaskQueueId, const std::string &errorMessage )
    {
        m_logger->error( "[{}] PROCESSING_ERROR reason: {} ID: {}", node_address_, errorMessage, subTaskQueueId );

        // Add this channel to blacklist to prevent repeated processing attempts
        {
            std::lock_guard lockBlacklist( m_mutexBlacklist );
            m_blacklistedChannels.insert( subTaskQueueId );
            m_logger->info( "[{}] Blacklisted channel {} due to processing error (total blacklisted: {})",
                            node_address_,
                            subTaskQueueId,
                            m_blacklistedChannels.size() );
        }

        if ( userCallbackError_ )
        {
            userCallbackError_( subTaskQueueId );
        }
        m_subTaskEnqueuer->MarkTaskBad( subTaskQueueId );
        {
            std::scoped_lock lock( m_mutexNodes );
            m_processingNodes.erase( subTaskQueueId );
        }

        if ( !m_isStopped )
        {
            SendChannelListRequest();
        }
    }

    void ProcessingServiceImpl::OnProcessingDone( const std::string &taskId )
    {
        m_logger->debug( "[{}] PROCESSING_DONE: for task {}", node_address_, taskId );

        {
            std::scoped_lock lock( m_mutexNodes );
            m_processingNodes.erase( taskId );
        }

        if ( !m_isStopped )
        {
            SendChannelListRequest();
        }
    }

    void ProcessingServiceImpl::AcceptProcessingChannel( const std::string &channelId )
    {
        if ( m_isStopped )
        {
            return;
        }

        // Check if this channel is blacklisted
        {
            std::lock_guard lockBlacklist( m_mutexBlacklist );
            if ( m_blacklistedChannels.find( channelId ) != m_blacklistedChannels.end() )
            {
                m_logger->debug( "[{}] Not accepting blacklisted channel {}", node_address_, channelId );
                return;
            }
        }

        m_logger->debug( "[{}] AcceptProcessingChannel for queue {}", node_address_, channelId );

        // Check if we're currently in the process of creating any node
        {
            std::lock_guard lockCreation( m_mutexPendingCreation );

            // Check if our pending creation is stale
            if ( !m_pendingSubTaskQueueId.empty() && IsPendingCreationStale() )
            {
                m_logger->debug( "[{}] Clearing stale pending creation for queue {}",
                                 node_address_,
                                 m_pendingSubTaskQueueId );
                m_pendingSubTaskQueueId.clear();
                m_pendingSubTasks.clear();
                m_pendingTask.reset();
                m_competingPeers.clear();
            }

            // If we still have a pending creation, don't accept this channel
            if ( !m_pendingSubTaskQueueId.empty() )
            {
                m_logger->debug( "[{}] Not accepting channel {} as we're negotiating for queue {}",
                                 node_address_,
                                 channelId,
                                 m_pendingSubTaskQueueId );
                return;
            }

            // If this is the queue we were just negotiating for (and lost), wait a bit
            // This helps prevent race conditions where we immediately try to join a queue
            // that another peer is just in the process of creating
            // In practice, this is rare since the winning peer will have already created the node
            if ( m_pendingSubTaskQueueId == channelId )
            {
                m_logger->debug( "[{}] Not accepting channel {} as we just lost negotiation for it",
                                 node_address_,
                                 channelId );
                return;
            }
        }

        // Also check if we already have this queue
        std::scoped_lock lock( m_mutexNodes );
        if ( m_processingNodes.find( channelId ) != m_processingNodes.end() )
        {
            m_logger->debug( "[{}] Not accepting channel {} as we already have a node for it",
                             node_address_,
                             channelId );
            return;
        }

        m_logger->debug( "[{}] Number of nodes: {}, Max nodes: {}",
                         node_address_,
                         m_processingNodes.size(),
                         m_maximalNodesCount );

        if ( m_processingNodes.size() < m_maximalNodesCount )
        {
            m_logger->debug( "[{}] Accept Channel: Creating Node for queue {}", node_address_, channelId );

            auto weakSelf = weak_from_this();

            auto node = ProcessingNode::New(
                m_gossipPubSub,
                m_subTaskResultStorage,
                m_processingCore,
                [weakSelf, channelId]( const SGProcessing::TaskResult &result )
                {
                    if ( auto self = weakSelf.lock() )
                    {
                        self->OnQueueProcessingCompleted( channelId, result );
                    }
                },
                [weakSelf, channelId]( const std::string &error )
                {
                    if ( auto self = weakSelf.lock() )
                    {
                        self->OnProcessingError( channelId, error );
                    }
                },
                [weakSelf, channelId]()
                {
                    if ( auto self = weakSelf.lock() )
                    {
                        self->OnProcessingDone( channelId );
                    }
                },
                node_address_,
                channelId );

            if ( node != nullptr )
            {
                m_processingNodes[channelId] = node;
            }
        }

        if ( m_processingNodes.size() == m_maximalNodesCount )
        {
            m_timerChannelListRequestTimeout.expires_at( boost::posix_time::pos_infin );
        }
    }

    void ProcessingServiceImpl::PublishLocalChannelList()
    {
        m_logger->trace( "[{}] Publishing local channels", node_address_ );
        std::scoped_lock lock( m_mutexNodes );
        for ( auto &itNode : m_processingNodes )
        {
            m_logger->trace( "[{}] Channel {}: Owns Channel? {}",
                             node_address_,
                             itNode.first,
                             itNode.second->HasQueueOwnership() );

            // Only channel host answers to reduce a number of published messages
            if ( itNode.second->HasQueueOwnership() )
            {
                SGProcessing::GridChannelMessage gridMessage;
                auto                             channelResponse = gridMessage.mutable_processing_channel_response();
                channelResponse->set_channel_id( itNode.first );

                m_gridChannel->Publish( gridMessage.SerializeAsString() );
                m_logger->trace( "[{}] Channel published: {}", node_address_, channelResponse->channel_id() );
            }
        }
    }

    size_t ProcessingServiceImpl::GetProcessingNodesCount() const
    {
        std::scoped_lock lock( m_mutexNodes );
        return m_processingNodes.size();
    }

    void ProcessingServiceImpl::SetChannelListRequestTimeout(
        boost::posix_time::time_duration channelListRequestTimeout )
    {
        m_channelListRequestTimeout = channelListRequestTimeout;
    }

    ProcessingServiceImpl::ProcessingStatus ProcessingServiceImpl::GetProcessingStatus() const
    {
        if ( m_isStopped )
        {
            return ProcessingStatus( Status::DISABLED, 0.0f );
        }

        float  totalProgress = 0.0f;
        size_t nodeCount     = 0;

        {
            std::lock_guard lock( m_mutexNodes );
            if ( m_processingNodes.empty() )
            {
                return ProcessingStatus( Status::IDLE, 0.0f );
            }

            // Calculate average progress across all processing nodes
            for ( const auto &[queueId, node] : m_processingNodes )
            {
                if ( node )
                {
                    totalProgress += node->GetProgress();
                    ++nodeCount;
                }
            }
        }

        float averageProgress = ( nodeCount > 0 ) ? ( totalProgress / nodeCount ) : 0.0f;
        // Round to 2 decimal places
        averageProgress = std::round( averageProgress * 100.0f ) / 100.0f;

        return ProcessingStatus( Status::PROCESSING, averageProgress );
    }

    void ProcessingServiceImpl::HandleRequestTimeout()
    {
        m_waitingChannelRequest = false;
        m_logger->debug( "QUEUE_REQUEST_TIMEOUT" );
        m_timerChannelListRequestTimeout.expires_at( boost::posix_time::pos_infin );

        if ( m_isStopped )
        {
            return;
        }

        // Check if we're already waiting for a node creation to resolve
        {
            std::lock_guard lockCreation( m_mutexPendingCreation );

            // Check if our pending creation is stale and should be cleared
            if ( !m_pendingSubTaskQueueId.empty() )
            {
                if ( IsPendingCreationStale() )
                {
                    m_logger->debug( "[{}] Clearing stale pending creation for queue {}",
                                     node_address_,
                                     m_pendingSubTaskQueueId );
                    m_pendingSubTaskQueueId.clear();
                    m_pendingSubTasks.clear();
                    m_pendingTask.reset();
                    m_competingPeers.clear();
                }
                else
                {
                    m_logger->debug( "[{}] Already waiting for node creation to resolve for queue {}",
                                     node_address_,
                                     m_pendingSubTaskQueueId );
                    return;
                }
            }
        }
        m_logger->trace( "[{}] [Trying to create node]", node_address_ );

        // Check if we are at max capacity
        {
            std::scoped_lock lock( m_mutexNodes );
            if ( m_processingNodes.size() >= m_maximalNodesCount )
            {
                m_logger->debug( "[{}] At maximum node capacity ({}) - not attempting to grab tasks",
                                 node_address_,
                                 m_maximalNodesCount );
                return;
            }
        }
        std::string                      subTaskQueueId;
        std::list<SGProcessing::SubTask> subTasks;
        auto                             maybe_task = m_subTaskEnqueuer->EnqueueSubTasks( subTaskQueueId, subTasks );

        if ( maybe_task )
        {
            // Mark ourselves as busy with this potential node creation
            {
                std::scoped_lock lock( m_mutexNodes, m_mutexPendingCreation );

                // Double-check we're still under the limit
                if ( m_processingNodes.size() >= m_maximalNodesCount )
                {
                    m_logger->debug( "[{}] Maximum nodes reached while grabbing task - abandoning", node_address_ );
                    return;
                }

                m_pendingSubTaskQueueId = subTaskQueueId;
                m_pendingSubTasks       = subTasks;
                m_pendingTask           = maybe_task.value();
            }

            // Instead of immediately creating a ProcessingNode, we'll broadcast our intent
            // and wait for responses from other peers
            m_logger->debug( "[{}] Grabbed task, broadcasting intent to create node for queue {}",
                             node_address_,
                             subTaskQueueId );

            BroadcastNodeCreationIntent( subTaskQueueId );
        }
        else
        {
            m_logger->trace( "[{}] No tasks available, requesting channel list", node_address_ );
            SendChannelListRequest();
        }
    }

    void ProcessingServiceImpl::BroadcastNodeCreationIntent( const std::string &subTaskQueueId )
    {
        SGProcessing::GridChannelMessage gridMessage;
        auto                             intent = gridMessage.mutable_node_creation_intent();
        intent->set_peer_address( node_address_ );
        intent->set_subtask_queue_id( subTaskQueueId );

        // Add ourselves to competing peers
        {
            std::lock_guard lockCreation( m_mutexPendingCreation );
            m_competingPeers.insert( node_address_ );
            m_pendingCreationTimestamp = std::chrono::steady_clock::now();
        }

        m_gridChannel->Publish( gridMessage.SerializeAsString() );
        m_logger->debug( "[{}] Broadcasting intent to create node for queue {}", node_address_, subTaskQueueId );

        // Set timer to wait for other peers' responses
        m_nodeCreationTimer.expires_from_now( m_nodeCreationTimeout );
        m_nodeCreationTimer.async_wait(
            [instance = shared_from_this()]( const boost::system::error_code &error )
            {
                if ( !error )
                { // Only proceed if not canceled
                    instance->HandleNodeCreationTimeout();
                }
            } );
    }

    void ProcessingServiceImpl::OnNodeCreationIntent( const std::string &peerAddress,
                                                      const std::string &subTaskQueueId )
    {
        if ( peerAddress == node_address_ )
        {
            // Ignore our own message
            return;
        }

        m_logger->debug( "[{}] Received node creation intent from {} for queue {}",
                         node_address_,
                         peerAddress,
                         subTaskQueueId );

        bool        shouldCancel = false;
        std::string lowestPeer;

        {
            std::lock_guard lockCreation( m_mutexPendingCreation );

            // Only process if this is for our pending queue
            if ( m_pendingSubTaskQueueId == subTaskQueueId )
            {
                m_competingPeers.insert( peerAddress );
                m_pendingCreationTimestamp = std::chrono::steady_clock::now(); // Reset timeout

                if ( !HasLowestAddress() )
                {
                    shouldCancel = true;
                    lowestPeer   = *m_competingPeers.begin();
                }
            }
        }

        if ( shouldCancel )
        {
            // Cancel our timer (do this outside the lock to avoid potential deadlocks)
            m_nodeCreationTimer.cancel();

            std::string reason = "peer " + lowestPeer + " has lower address";
            CancelPendingCreation( reason );
        }
    }

    void ProcessingServiceImpl::CancelPendingCreation( const std::string &reason )
    {
        std::lock_guard lockCreation( m_mutexPendingCreation );

        if ( !m_pendingSubTaskQueueId.empty() )
        {
            m_logger->debug( "[{}] Cancelling node creation for queue {} because {}",
                             node_address_,
                             m_pendingSubTaskQueueId,
                             reason );

            m_pendingSubTaskQueueId.clear();
            m_pendingSubTasks.clear();
            m_pendingTask.reset();
            m_competingPeers.clear();
        }
    }

    bool ProcessingServiceImpl::HasLowestAddress() const
    {
        if ( m_competingPeers.empty() )
        {
            return true;
        }

        return *m_competingPeers.begin() == node_address_;
    }

    void ProcessingServiceImpl::HandleNodeCreationTimeout()
    {
        std::string                      subTaskQueueId;
        std::list<SGProcessing::SubTask> subTasks;

        {
            std::lock_guard lockCreation( m_mutexPendingCreation );

            if ( m_pendingSubTaskQueueId.empty() )
            {
                // Creation was already canceled
                m_logger->debug( "[{}] Node creation attempt was already cancelled", node_address_ );
                return;
            }

            subTaskQueueId = m_pendingSubTaskQueueId;
            subTasks       = m_pendingSubTasks;

            // Check if we still have the lowest address
            if ( !HasLowestAddress() )
            {
                auto lowestPeer = *m_competingPeers.begin();
                m_logger->debug( "[{}] Not creating node for queue {} as peer {} has lower address",
                                 node_address_,
                                 subTaskQueueId,
                                 lowestPeer );

                // Clear pending data
                m_pendingSubTaskQueueId.clear();
                m_pendingSubTasks.clear();
                m_pendingTask.reset();
                m_competingPeers.clear();
                return;
            }

            // Clear pending data since we're going to use it now
            m_pendingSubTaskQueueId.clear();
            m_pendingSubTasks.clear();
            m_pendingTask.reset();
            m_competingPeers.clear();
        }

        m_logger->debug( "[{}] Timeout elapsed, creating node for queue {} as we have lowest address",
                         node_address_,
                         subTaskQueueId );

        // Check if we can still add more nodes
        std::unique_lock lock( m_mutexNodes );

        // Check if we already have this node (could have been created passively)
        if ( m_processingNodes.find( subTaskQueueId ) != m_processingNodes.end() )
        {
            m_logger->debug( "[{}] Not creating node for queue {} as it already exists",
                             node_address_,
                             subTaskQueueId );
            return;
        }

        if ( m_processingNodes.size() >= m_maximalNodesCount )
        {
            m_logger->debug( "[{}] Cannot create node for queue {} as maximum nodes limit reached",
                             node_address_,
                             subTaskQueueId );
            return;
        }

        // Create the ProcessingNode
        auto weakSelf = weak_from_this();

        auto node = ProcessingNode::New(
            m_gossipPubSub,
            m_subTaskResultStorage,
            m_processingCore,
            [weakSelf, subTaskQueueId]( const SGProcessing::TaskResult &result )
            {
                if ( auto self = weakSelf.lock() )
                {
                    self->OnQueueProcessingCompleted( subTaskQueueId, result );
                }
            },
            [weakSelf, subTaskQueueId]( const std::string &error )
            {
                if ( auto self = weakSelf.lock() )
                {
                    self->OnProcessingError( subTaskQueueId, error );
                }
            },
            [weakSelf, subTaskQueueId]()
            {
                if ( auto self = weakSelf.lock() )
                {
                    self->OnProcessingDone( subTaskQueueId );
                }
            },
            node_address_,
            subTaskQueueId,
            subTasks );

        if ( node != nullptr )
        {
            m_processingNodes[subTaskQueueId] = node;
        }

        lock.unlock(); // Release the mutex before potentially long operations

        m_logger->debug( "[{}] New processing channel created: {}", node_address_, subTaskQueueId );

        // Notify other peers that this channel is now available
        PublishLocalChannelList();

        // Send a new channel list request to continue processing
        SendChannelListRequest();
    }

    bool ProcessingServiceImpl::IsPendingCreationStale() const
    {
        auto now     = std::chrono::steady_clock::now();
        auto elapsed = std::chrono::duration_cast<std::chrono::seconds>( now - m_pendingCreationTimestamp );
        return elapsed > m_pendingCreationTimeout;
    }
}

Updated on 2026-03-04 at 13:10:44 -0800