Skip to content

src/processing/processing_engine.cpp

Namespaces

Name
sgns
sgns::processing

Source code

#include "processing_engine.hpp"

#include <thread>
#include <memory>
#include <utility>

namespace sgns::processing
{
    ProcessingEngine::ProcessingEngine( std::string                                nodeId,
                                        std::shared_ptr<ProcessingCore>            processingCore,
                                        std::function<void( const std::string & )> processingErrorSink,
                                        std::function<void( void )>                processingDoneSink ) :
        m_nodeId( std::move( nodeId ) ),
        m_processingCore( std::move( processingCore ) ),
        m_processingErrorSink( std::move( processingErrorSink ) ),
        m_processingDoneSink( std::move( processingDoneSink ) )
    {
    }

    ProcessingEngine::~ProcessingEngine()
    {
        m_logger->debug( "[RELEASED] m_nodeId: {},", m_nodeId );
    }

    void ProcessingEngine::StartQueueProcessing( std::shared_ptr<SubTaskQueueAccessor> subTaskQueueAccessor )
    {
        std::lock_guard<std::mutex> queueGuard( m_mutexSubTaskQueue );
        m_logger->debug( "[START QUEUE PROCESSING] m_nodeId: {},", m_nodeId );
        m_subTaskQueueAccessor = std::move( subTaskQueueAccessor );

        m_subTaskQueueAccessor->GrabSubTask(
            [weakThis( weak_from_this() )]( boost::optional<const SGProcessing::SubTask &> subTask )
            {
                auto _this = weakThis.lock();
                if ( !_this )
                {
                    return;
                }
                _this->OnSubTaskGrabbed( subTask );
            } );
    }

    void ProcessingEngine::StopQueueProcessing()
    {
        std::lock_guard<std::mutex> queueGuard( m_mutexSubTaskQueue );
        m_subTaskQueueAccessor.reset();
        m_logger->debug( "[PROCESSING_STOPPED] m_nodeId: {}", m_nodeId );
    }

    bool ProcessingEngine::IsQueueProcessingStarted() const
    {
        std::lock_guard<std::mutex> queueGuard( m_mutexSubTaskQueue );
        return m_subTaskQueueAccessor != nullptr;
    }

    float ProcessingEngine::GetProgress() const
    {
        if (m_processingCore) {
            return m_processingCore->GetProgress();
        }
        return 0.0f;
    }

    void ProcessingEngine::OnSubTaskGrabbed( boost::optional<const SGProcessing::SubTask &> subTask )
    {
        if ( subTask )
        {
            try {
                std::string subtaskId = subTask->subtaskid(); // Test if subtask is valid
                m_logger->debug( "[GRABBED] m_nodeId ({}), subtask ({}).", m_nodeId, subtaskId );
                ProcessSubTask( *subTask );
            } catch (const std::exception& e) {
                m_logger->error( "[GRABBED ERROR] m_nodeId ({}), error: {}", m_nodeId, e.what() );
            }
        }
        else
        {
            m_logger->debug( "ALL SUBTASKS ARE GRABBED. ({}).", m_nodeId );
            m_processingDoneSink();
        }
        // When results for all subtasks are available, no subtask is received (optnull).
    }

    void ProcessingEngine::ProcessSubTask( SGProcessing::SubTask subTask )
    {
        // Safely validate subTask before processing
        std::string subtaskId;
        try {
            subtaskId = subTask.subtaskid();
            if (subtaskId.empty()) {
                m_logger->error("ProcessSubTask called with empty subtaskid for node: {}", m_nodeId);
                return;
            }
        } catch (const std::exception& e) {
            m_logger->error("ProcessSubTask called with corrupted subTask for node: {} - {}", m_nodeId, e.what());
            return;
        }

        m_logger->debug( "[PROCESSING_STARTED]. m_nodeId ({}), subtask ({}).", m_nodeId, subtaskId );
        std::thread thread(
            [subTask( std::move( subTask ) ), _this( shared_from_this() )]()
            {
                // Double-check we haven't been destroyed
                if (!_this) {
                    return;
                }

                // Make a local copy of critical data to avoid corruption
                std::string subtaskId = subTask.subtaskid();
                std::string nodeId = _this->m_nodeId;

                if (subtaskId.empty()) {
                    _this->m_logger->error("Subtask ID became empty during processing for node: {}", nodeId);
                    return;
                }

                // @todo set initial hash code that depends on node id
                auto maybe_result = _this->m_processingCore->ProcessSubTask(
                    subTask,
                    std::hash<std::string>{}( nodeId ) );
                if ( maybe_result.has_value() )
                {
                    SGProcessing::SubTaskResult result = maybe_result.value();

                    // Use local copies to avoid corruption
                    try {
                        result.set_subtaskid( subtaskId );
                        result.set_node_address( nodeId );

                        _this->m_logger->debug( "[PROCESSED]. m_nodeId ({}), subtask ({}).",
                                                nodeId,
                                                subtaskId );

                        std::lock_guard<std::mutex> queueGuard( _this->m_mutexSubTaskQueue );
                        if ( _this->m_subTaskQueueAccessor )
                        {
                            _this->m_subTaskQueueAccessor->CompleteSubTask( subtaskId, result );
                            _this->m_subTaskQueueAccessor->GrabSubTask(
                                [weakThis( std::weak_ptr<sgns::processing::ProcessingEngine>( _this ) )](
                                    boost::optional<const SGProcessing::SubTask &> subTask )
                                {
                                    auto _this = weakThis.lock();
                                    if ( !_this )
                                    {
                                        return;
                                }
                                _this->OnSubTaskGrabbed( subTask );
                            } );
                        }
                    } catch (const std::exception& e) {
                        _this->m_logger->error("Error setting protobuf fields for subtask {}: {}", subtaskId, e.what());
                    }
                }
                else
                {
                    _this->m_processingErrorSink( maybe_result.error().message() );
                }
            } );
        thread.detach();
    }

}

Updated on 2026-03-04 at 13:10:44 -0800