Skip to content

src/processing/processing_node.cpp

Namespaces

Name
sgns
sgns::processing

Source code

#include "processing_node.hpp"

#include <utility>

#include "processing_subtask_queue_channel_pubsub.hpp"
#include "processing/processing_subtask_queue_accessor_impl.hpp"

namespace sgns::processing
{
    std::shared_ptr<ProcessingNode> ProcessingNode::New(
        std::shared_ptr<GossipPubSub>                           gossipPubSub,
        std::shared_ptr<SubTaskResultStorage>                   subTaskResultStorage,
        std::shared_ptr<ProcessingCore>                         processingCore,
        std::function<void( const SGProcessing::TaskResult & )> taskResultProcessingSink,
        std::function<void( const std::string & )>              processingErrorSink,
        std::function<void( void )>                             processingDoneSink,
        std::string                                             node_id,
        const std::string                                      &processingQueueChannelId,
        std::list<SGProcessing::SubTask>                        subTasks,
        std::chrono::milliseconds                               msSubscriptionWaitingDuration,
        std::chrono::seconds                                    ttl )
    {
        // Create the shared_ptr using the protected constructor
        auto node = std::shared_ptr<ProcessingNode>( new ProcessingNode( std::move( gossipPubSub ),
                                                                         std::move( subTaskResultStorage ),
                                                                         std::move( processingCore ),
                                                                         std::move( taskResultProcessingSink ),
                                                                         std::move( processingErrorSink ),
                                                                         std::move( processingDoneSink ),
                                                                         std::move( node_id ),
                                                                         std::move( ttl ) ) );

        node->Initialize( processingQueueChannelId, msSubscriptionWaitingDuration );
        node->InitTTL();
        if ( !node->AttachTo( processingQueueChannelId ) )
        {
            node = nullptr;
        }
        else
        {
            if ( !subTasks.empty() )
            {
                if ( !node->CreateSubTaskQueue( std::move( subTasks ) ) )
                {
                    node = nullptr;
                }
            }
        }

        return node;
    }

    ProcessingNode::ProcessingNode( std::shared_ptr<GossipPubSub>                           gossipPubSub,
                                    std::shared_ptr<SubTaskResultStorage>                   subTaskResultStorage,
                                    std::shared_ptr<ProcessingCore>                         processingCore,
                                    std::function<void( const SGProcessing::TaskResult & )> taskResultProcessingSink,
                                    std::function<void( const std::string & )>              processingErrorSink,
                                    std::function<void( void )>                             processingDoneSink,
                                    std::string                                             node_id,
                                    std::chrono::seconds                                    ttl ) :

        m_gossipPubSub( std::move( gossipPubSub ) ),
        m_nodeId( std::move( node_id ) ),
        m_processingCore( std::move( processingCore ) ),
        m_subTaskResultStorage( std::move( subTaskResultStorage ) ),
        m_taskResultProcessingSink( std::move( taskResultProcessingSink ) ),
        m_processingErrorSink( std::move( processingErrorSink ) ),
        m_processingDoneSink( std::move( processingDoneSink ) ),
        m_creationTime( std::chrono::steady_clock::now() ),
        m_ttl( ttl ),
        m_localContext( std::make_shared<boost::asio::io_context>() ),
        m_localWorkGuard( m_localContext->get_executor() ),
        m_localIoThread( [ctx = m_localContext]() { ctx->run(); } )
    {
        m_logger->debug( "[{}] Processing node CREATED", m_nodeId );
        if ( m_gossipPubSub )
        {
            m_ttlTimer = std::make_unique<boost::asio::steady_timer>( *m_localContext );
        }
    }

    ProcessingNode::~ProcessingNode()
    {
        m_logger->debug( "[{}] Processing node DELETED ", m_nodeId );
        if ( m_localContext )
        {
            m_localContext->stop();
        }
        if ( m_localWorkGuard )
        {
            m_localWorkGuard->reset();
        }
        if ( m_localIoThread.joinable() )
        {
            // Avoid joining from the same thread (would throw/terminate)
            if ( std::this_thread::get_id() == m_localIoThread.get_id() )
            {
                m_localIoThread.detach();
            }
            else
            {
                m_localIoThread.join();
            }
        }
    }

    void ProcessingNode::Initialize( const std::string        &processingQueueChannelId,
                                     std::chrono::milliseconds msSubscriptionWaitingDuration )
    {
        // Subscribe to subtask queue channel
        auto processingQueueChannel = std::make_shared<ProcessingSubTaskQueueChannelPubSub>( m_gossipPubSub,
                                                                                             processingQueueChannelId );

        m_subtaskQueueManager = std::make_shared<ProcessingSubTaskQueueManager>( processingQueueChannel,
                                                                                 m_localContext,
                                                                                 m_nodeId,
                                                                                 m_processingErrorSink );

        m_subTaskQueueAccessor = std::make_shared<SubTaskQueueAccessorImpl>( m_gossipPubSub,
                                                                             m_subtaskQueueManager,
                                                                             m_subTaskResultStorage,
                                                                             m_taskResultProcessingSink,
                                                                             m_processingErrorSink );

        processingQueueChannel->SetQueueRequestSink(
            [qmWeak( std::weak_ptr<ProcessingSubTaskQueueManager>( m_subtaskQueueManager ) )](
                const SGProcessing::SubTaskQueueRequest &request )
            {
                auto qm = qmWeak.lock();
                if ( qm )
                {
                    qm->ProcessSubTaskQueueRequestMessage( request );
                    return true;
                }
                return false;
            } );

        processingQueueChannel->SetQueueUpdateSink(
            [qmWeak( std::weak_ptr<ProcessingSubTaskQueueManager>( m_subtaskQueueManager ) )](
                SGProcessing::SubTaskQueue *queue )
            {
                auto qm = qmWeak.lock();
                if ( qm )
                {
                    qm->ProcessSubTaskQueueMessage( queue );
                    return true;
                }
                return false;
            } );

        m_processingEngine = std::make_shared<ProcessingEngine>( m_nodeId,
                                                                 m_processingCore,
                                                                 m_processingErrorSink,
                                                                 m_processingDoneSink );

        // Run messages processing once all dependent object are created
        (void)processingQueueChannel->Listen( msSubscriptionWaitingDuration );

        // Keep the channel
        m_queueChannel = processingQueueChannel;
        m_logger->debug( "[{}] Processing node INITIALIZED", m_nodeId );
    }

    bool ProcessingNode::AttachTo( const std::string &processingQueueChannelId )
    {
        m_logger->debug( "[{}] Processing node AttachTo {} ", m_nodeId, processingQueueChannelId );
        bool ret = false;

        if ( m_subTaskQueueAccessor->CreateResultsChannel( processingQueueChannelId ) )
        {
            ret = m_subTaskQueueAccessor->ConnectToSubTaskQueue(
                [engineWeak( std::weak_ptr<ProcessingEngine>( m_processingEngine ) ),
                 accessorWeak( std::weak_ptr<SubTaskQueueAccessor>( m_subTaskQueueAccessor ) )]()
                {
                    auto engine   = engineWeak.lock();
                    auto accessor = accessorWeak.lock();
                    if ( engine && accessor )
                    {
                        engine->StartQueueProcessing( accessor );
                    }
                } );
        }

        // @todo Set timer to handle queue request timeout
        return ret;
    }

    bool ProcessingNode::CreateSubTaskQueue( std::list<SGProcessing::SubTask> subTasks )
    {
        m_logger->debug( "[{}] First ProcessingNode of Task. Creating SubTask Queue", m_nodeId );
        return m_subTaskQueueAccessor->AssignSubTasks( subTasks );
    }

    bool ProcessingNode::HasQueueOwnership() const
    {
        return m_subtaskQueueManager && m_subtaskQueueManager->HasOwnership();
    }

    float ProcessingNode::GetProgress() const
    {
        if (m_processingEngine) {
            return m_processingEngine->GetProgress();
        }
        return 0.0f;
    }

    void ProcessingNode::InitTTL()
    {
        m_creationTime = std::chrono::steady_clock::now();

        if ( m_ttlTimer )
        {
            StartTTLTimer();
        }
    }

    void ProcessingNode::StartTTLTimer()
    {
        if ( !m_ttlTimer )
        {
            return;
        }

        m_ttlTimer->cancel();
        m_ttlTimer->expires_after( m_ttl );

        m_logger->debug( "Starting the TTL timer for node {}", m_nodeId );

        // Use weak_ptr to avoid circular reference
        std::weak_ptr<ProcessingNode> weakThis = shared_from_this();

        m_ttlTimer->async_wait(
            [weakThis]( const std::error_code &ec )
            {
                if ( auto self = weakThis.lock() )
                {
                    self->CheckTTL( ec );
                }
            } );
    }

    void ProcessingNode::CheckTTL( const std::error_code &ec )
    {
        // If canceled or error, don't do anything
        if ( ec )
        {
            return;
        }

        // Calculate elapsed time
        auto now     = std::chrono::steady_clock::now();
        auto elapsed = std::chrono::duration_cast<std::chrono::seconds>( now - m_creationTime );

        // If TTL exceeded, self-destruct
        if ( elapsed >= m_ttl )
        {
            m_logger->error( "Processing node {} TTL expired after {} seconds. Self-destructing...",
                             m_nodeId,
                             elapsed.count() );

            // Cancel the timer to prevent any further callbacks
            m_ttlTimer->cancel();

            // Clean up resources before self-destruction
            m_processingErrorSink( "Node timed out" );

            // Let shared_ptr ownership mechanism handle the actual deletion
            // The object will be deleted when the last shared_ptr reference is released
        }
        else
        {
            // Not yet time to destruct, schedule another check
            m_ttlTimer->expires_after( std::chrono::seconds( 10 ) );

            std::weak_ptr<ProcessingNode> weakThis = weak_from_this();
            m_ttlTimer->async_wait(
                [weakThis]( const std::error_code &ec )
                {
                    if ( auto self = weakThis.lock() )
                    {
                        self->CheckTTL( ec );
                    }
                } );
        }
    }
}

Updated on 2026-03-04 at 13:10:44 -0800