Skip to content

src/processing/processing_subtask_queue_manager.cpp

Namespaces

Name
sgns
sgns::processing

Source code

#include "processing_subtask_queue_manager.hpp"

#include <utility>
#include <thread>

namespace sgns::processing
{
    ProcessingSubTaskQueueManager::ProcessingSubTaskQueueManager(
        std::shared_ptr<ProcessingSubTaskQueueChannel> queueChannel,
        std::shared_ptr<boost::asio::io_context>       context,
        const std::string                             &localNodeId,
        std::function<void( const std::string & )>     processingErrorSink,
        uint64_t                                       delayBetweenProcessingMs ) :
        m_queueChannel( std::move( queueChannel ) ),
        m_context( std::move( context ) ),
        m_localNodeId( localNodeId ),
        m_dltQueueResponseTimeout( *m_context ),
        m_queueResponseTimeout( boost::posix_time::seconds( 5 ) ),
        m_dltGrabSubTaskTimeout( *m_context ),
        m_processingQueue( localNodeId, [this]() { return this->GetCurrentQueueTimestamp(); } ),
        m_processingTimeout( std::chrono::seconds( 15 ) ),
        m_processingErrorSink( std::move( processingErrorSink ) ),
        m_delayBetweenProcessingMs( std::move( delayBetweenProcessingMs ) )
    {
        m_maxSubtasksPerOwnership = m_defaultMaxSubtasksPerOwnership;
    }

    void ProcessingSubTaskQueueManager::SetProcessingTimeout(
        const std::chrono::system_clock::duration &processingTimeout )
    {
        m_processingTimeout = processingTimeout;
    }

    ProcessingSubTaskQueueManager::~ProcessingSubTaskQueueManager()
    {
        m_logger->debug( "[RELEASED] this: {}", reinterpret_cast<size_t>( this ) );
    }

    bool ProcessingSubTaskQueueManager::CreateQueue( std::list<SGProcessing::SubTask> &subTasks )
    {
        // Check if all subtasks have at least one chunk to process
        bool hasValidChunks = true;
        for ( const auto &subtask : subTasks )
        {
            if ( subtask.chunkstoprocess_size() == 0 )
            {
                hasValidChunks = false;
                break;
            }
        }

        if ( !hasValidChunks )
        {
            m_logger->error( "Failed to create queue: subtasks must have at least one chunk to process" );
            return false;
        }

        auto queue           = std::make_shared<SGProcessing::SubTaskQueue>();
        auto queueSubTasks   = queue->mutable_subtasks();
        auto processingQueue = queue->mutable_processing_queue();
        for ( auto itSubTask : subTasks )
        {
            // Move subtask to heap
            auto subTask = std::make_unique<SGProcessing::SubTask>( std::move( itSubTask ) );
            queueSubTasks->mutable_items()->AddAllocated( subTask.release() );
            processingQueue->add_items();
        }

        // Record ownership acquisition time when creating a queue
        m_ownership_acquired_at_ = std::chrono::duration_cast<std::chrono::milliseconds>(
                                       std::chrono::steady_clock::now().time_since_epoch() )
                                       .count();
        m_queue_timestamp_           = m_ownership_acquired_at_;
        m_ownership_last_delta_time_ = m_ownership_acquired_at_;

        processingQueue->set_processing_timeout_length( m_processingTimeout.count() );
        processingQueue->set_last_update_timestamp( m_queue_timestamp_ );
        std::unique_lock guard( m_queueMutex );
        m_queue = std::move( queue );

        m_processedSubTaskIds = {};
        // Map subtask IDs to subtask indices
        std::vector<int> unprocessedSubTaskIndices;
        for ( int subTaskIdx = 0; subTaskIdx < m_queue->subtasks().items_size(); ++subTaskIdx )
        {
            const auto &subTaskId = m_queue->subtasks().items( subTaskIdx ).subtaskid();
            if ( m_processedSubTaskIds.find( subTaskId ) == m_processedSubTaskIds.end() )
            {
                unprocessedSubTaskIndices.push_back( subTaskIdx );
            }
        }

        m_processingQueue.CreateQueue( processingQueue, unprocessedSubTaskIndices );

        m_logger->debug( "Subtask Queue created in timestamp {}", m_ownership_acquired_at_ );
        LogQueue();
        PublishSubTaskQueue();

        if ( m_subTaskQueueAssignmentEventSink )
        {
            std::vector<std::string> subTaskIds;
            subTaskIds.reserve( m_queue->subtasks().items_size() );
            for ( int subTaskIdx = 0; subTaskIdx < m_queue->subtasks().items_size(); ++subTaskIdx )
            {
                subTaskIds.push_back( m_queue->subtasks().items( subTaskIdx ).subtaskid() );
            }

            guard.unlock();
            m_subTaskQueueAssignmentEventSink( subTaskIds );
        }

        return true;
    }

    bool ProcessingSubTaskQueueManager::UpdateQueue( SGProcessing::SubTaskQueue *pQueue )
    {
        if ( pQueue == nullptr )
        {
            return false;
        }

        auto                                        queueChanged = false;
        std::shared_ptr<SGProcessing::SubTaskQueue> queue( pQueue );

        // First, decide whether to update our local set or the queue's set
        if ( !HasOwnership() )
        {
            m_queue_timestamp_ = queue->processing_queue().last_update_timestamp();
            UpdateQueueTimestamp();
            // merged processed subtasks just in case we caught some messages the owner didn't
            // when we get ownership we will update the processed_subtask_ids
            for ( int i = 0; i < queue->processing_queue().processed_subtask_ids_size(); ++i )
            {
                m_processedSubTaskIds.insert( queue->processing_queue().processed_subtask_ids( i ) );
            }
        }
        else
        {
            // Merge incoming processed IDs into local set before updating queue
            for ( int i = 0; i < queue->processing_queue().processed_subtask_ids_size(); ++i )
            {
                auto [_,
                      success] = m_processedSubTaskIds.insert( queue->processing_queue().processed_subtask_ids( i ) );
                if ( success )
                {
                    queueChanged = true;
                }
            }
            // If we're the owner, update the processed IDs in the queue from our local set
            queue->mutable_processing_queue()->clear_processed_subtask_ids();
            for ( const auto &processedId : m_processedSubTaskIds )
            {
                queue->mutable_processing_queue()->add_processed_subtask_ids( processedId );
            }

            if ( queueChanged )
            {
                m_queue_timestamp_ = queue->processing_queue().last_update_timestamp();
                UpdateQueueTimestamp();
                m_logger->debug( "QUEUE_PUBLISH_SUBTASK_PROCESSED_UPDATE: on {} at {}ms",
                                 m_localNodeId,
                                 m_queue_timestamp_ );
                LogQueue();
                PublishSubTaskQueue();
            }
        }

        // Now map subtask IDs to subtask indices based on the updated m_processedSubTaskIds
        std::vector<int> unprocessedSubTaskIndices = UpdateUnprocessedSubTaskIndices( queue.get() );
        if ( m_processingQueue.UpdateQueue( queue->mutable_processing_queue(), unprocessedSubTaskIndices ) )
        {
            m_logger->debug( "QUEUE_LOCAL_UPDATE: on {} at {}ms", m_localNodeId, m_queue_timestamp_ );
            m_queue.swap( queue );
            LogQueue();
            queueChanged = true;
        }
        return queueChanged;
    }

    void ProcessingSubTaskQueueManager::ProcessPendingSubTaskGrabbing()
    {
        static constexpr int      MAX_CONSECUTIVE_FAILURES = 10;
        static constexpr uint64_t BACKOFF_TIMEOUT_MS       = 1000;

        m_dltGrabSubTaskTimeout.expires_at( boost::posix_time::pos_infin );

        m_logger->trace( "QUEUE_PROCESS_PENDING: for node {} at {}ms. is callback empty? {} current {} versus max {}",
                         m_localNodeId,
                         m_queue_timestamp_,
                         m_onSubTaskGrabbedCallbacks.empty(),
                         m_processedSubtasksInCurrentOwnership,
                         m_maxSubtasksPerOwnership );

        // Update queue timestamp based on current ownership duration
        UpdateQueueTimestamp();

        CheckActiveCount();

        bool losingOwnership = false;
        bool lockReleased    = false;

        int consecutiveFailures = 0;

        while ( !m_onSubTaskGrabbedCallbacks.empty() &&
                ( m_processedSubtasksInCurrentOwnership < m_maxSubtasksPerOwnership ) )
        {
            // If the lock was released in the previous iteration, reacquire it
            std::unique_lock guard( m_queueMutex, std::defer_lock );
            if ( lockReleased || !guard.owns_lock() )
            {
                guard.lock();
                lockReleased = false;
            }
            m_logger->debug( "QUEUE PROCESS CHECK" );
            size_t itemIdx = 0;
            if ( m_processingQueue.GrabItem( itemIdx, m_queue_timestamp_ ) )
            {
                consecutiveFailures = 0;

                // Track that we're using queue timestamp for this item
                // Actual timestamp is managed by ProcessingQueue via lock_timestamp
                // This will be checked when calculating task expiration
                LogQueue();
                PublishSubTaskQueue();

                m_processedSubtasksInCurrentOwnership++;

                // Make a copy of the subtask and callback
                auto subtaskCopy = m_queue->subtasks().items( itemIdx );
                auto callback    = m_onSubTaskGrabbedCallbacks.front();
                m_onSubTaskGrabbedCallbacks.pop_front();

                m_logger->debug( "GRAB_SUBTASK_TO_PROCESS: Subtask {} grabbed for node {} at {}ms.",
                                 subtaskCopy.subtaskid(),
                                 m_localNodeId,
                                 m_queue_timestamp_ );

                // Check for pending ownership requests AFTER processing a subtask
                if ( m_queue->processing_queue().ownership_requests_size() > 0 )
                {
                    m_logger->debug(
                        "QUEUE_PROCESSING_PAUSED: Pending ownership requests detected. Stopping further subtask processing for node {} at {}ms.",
                        m_localNodeId,
                        m_queue_timestamp_ );
                    losingOwnership = true;
                }
                // Release the lock before calling the callback
                guard.unlock();
                lockReleased = true;

                // Call the callback without holding the lock
                callback( { subtaskCopy } );
                if ( losingOwnership )
                {
                    break;
                }
            }
            else
            {
                consecutiveFailures++;

                // No available subtasks found
                auto unlocked = m_processingQueue.UnlockExpiredItems( m_queue_timestamp_ );
                if ( !unlocked )
                {
                    if ( consecutiveFailures >= MAX_CONSECUTIVE_FAILURES )
                    {
                        m_logger->debug( "Too many consecutive grab failures ({}), using longer backoff",
                                         consecutiveFailures );
                    }
                    break;
                }
            }
        }

        // After the while loop, ensure we have the lock for the remaining operations
        std::unique_lock finalGuard( m_queueMutex, std::defer_lock );
        if ( lockReleased )
        {
            finalGuard.lock();
        }

        if ( losingOwnership && !HasOwnership() && !IsProcessed() )
        {
            UpdateQueueTimestamp();
            // Add current node's ownership request before publishing
            m_processingQueue.AddOwnershipRequest( m_localNodeId, m_queue_timestamp_ );

            m_logger->debug( "READD_QUEUE_OWNDERSHIP_REQUEST: node {} at {}ms", m_localNodeId, m_queue_timestamp_ );
            PublishSubTaskQueue();
        }

        if ( !m_onSubTaskGrabbedCallbacks.empty() )
        {
            if ( m_processedSubTaskIds.size() < static_cast<size_t>( m_queue->processing_queue().items_size() ) )
            {
                // Calculate time until expiration in queue time
                uint64_t grabSubTaskTimeoutMs = ( consecutiveFailures >= MAX_CONSECUTIVE_FAILURES )
                                                    ? BACKOFF_TIMEOUT_MS
                                                    : CalculateGrabSubTaskTimeout();

                m_logger->trace( "GRAB_TIMEOUT set to {}ms for node {} (consecutive failures: {})",
                                 grabSubTaskTimeoutMs,
                                 m_localNodeId,
                                 consecutiveFailures );

                m_dltGrabSubTaskTimeout.expires_from_now( boost::posix_time::milliseconds( grabSubTaskTimeoutMs ) );

                m_dltGrabSubTaskTimeout.async_wait(
                    [instance = shared_from_this()]( const boost::system::error_code &ec )
                    { instance->HandleGrabSubTaskTimeout( ec ); } );
            }
            else
            {
                while ( !m_onSubTaskGrabbedCallbacks.empty() )
                {
                    // Let the requester know that there are no available subtasks
                    m_onSubTaskGrabbedCallbacks.front()( {} );
                    // Reset the callback
                    m_onSubTaskGrabbedCallbacks.pop_front();
                }
            }
        }
        // if we are here, we have processed all the subtasks we can or are waiting for ownership to grab more.
        // Small delay to allow pubsub and other threads to process
        std::this_thread::sleep_for( std::chrono::milliseconds( m_delayBetweenProcessingMs ) );
    }

    void ProcessingSubTaskQueueManager::HandleGrabSubTaskTimeout( const boost::system::error_code &ec )
    {
        if ( ec != boost::asio::error::operation_aborted )
        {
            std::lock_guard guard( m_queueMutex );
            m_dltGrabSubTaskTimeout.expires_at( boost::posix_time::pos_infin );
            m_logger->trace( "HANDLE_GRAB_TIMEOUT at {}ms from node {}", m_queue_timestamp_, m_localNodeId );
            if ( !m_onSubTaskGrabbedCallbacks.empty() &&
                 ( m_processedSubTaskIds.size() < static_cast<size_t>( m_queue->processing_queue().items_size() ) ) )
            {
                GrabSubTasks();
            }
        }
    }

    void ProcessingSubTaskQueueManager::GrabSubTask( SubTaskGrabbedCallback onSubTaskGrabbedCallback )
    {
        {
            std::lock_guard guard( m_queueMutex );
            m_onSubTaskGrabbedCallbacks.push_back( std::move( onSubTaskGrabbedCallback ) );
        }
        GrabSubTasks();
    }

    void ProcessingSubTaskQueueManager::GrabSubTasks()
    {
        if ( HasOwnership() )
        {
            ProcessPendingSubTaskGrabbing();
        }
        else
        {
            // Since we're not the owner, we must use the pubsub channel
            // to request ownership from the current owner
            m_queueChannel->RequestQueueOwnership( m_localNodeId );
        }
    }

    bool ProcessingSubTaskQueueManager::MoveOwnershipTo( const std::string &nodeId )
    {
        std::lock_guard guard( m_queueMutex );

        if ( m_processingQueue.MoveOwnershipTo( nodeId ) )
        {
            PublishSubTaskQueue();
            return true;
        }
        return false;
    }

    bool ProcessingSubTaskQueueManager::HasOwnership() const
    {
        // Always locks for this thread
        std::lock_guard tempLock( m_queueMutex );
        return m_processingQueue.HasOwnership();
    }

    void ProcessingSubTaskQueueManager::PublishSubTaskQueue()
    {
        UpdateQueueTimestamp();
        m_queueChannel->PublishQueue( m_queue );
        m_logger->debug( "QUEUE_PUBLISHED: by {} at {}ms", m_localNodeId, m_queue_timestamp_ );
    }

    bool ProcessingSubTaskQueueManager::ProcessSubTaskQueueMessage( SGProcessing::SubTaskQueue *queue )
    {
        bool hadOwnership = HasOwnership();

        std::unique_lock guard( m_queueMutex );

        // If we own the queue and this message is for a queue we own, discard it
        // as it's likely our own published message coming back
        if ( m_queue != nullptr && m_queue->processing_queue().owner_node_id() == m_localNodeId &&
             queue->processing_queue().owner_node_id() == m_localNodeId )
        {
            m_logger->debug( "Discarding queue message as we already own this queue" );
            return false;
        }

        m_dltQueueResponseTimeout.expires_at( boost::posix_time::pos_infin );

        bool queueInitialized = m_queue != nullptr;
        bool queueChanged      = UpdateQueue( queue );

        if ( queueChanged )
        {
            bool hasOwnershipNow = HasOwnership();
            if ( !hadOwnership && hasOwnershipNow )
            {
                // We just acquired ownership - record the time
                m_ownership_acquired_at_ = std::chrono::duration_cast<std::chrono::milliseconds>(
                                               std::chrono::steady_clock::now().time_since_epoch() )
                                               .count();
                m_queue_timestamp_                    = queue->processing_queue().last_update_timestamp();
                m_ownership_last_delta_time_          = m_ownership_acquired_at_;
                m_processedSubtasksInCurrentOwnership = 0; // Reset processed subtasks on new ownership

                m_logger->debug( "QUEUE_OWNERSHIP_ACQUIRED: by {} at {}ms", m_localNodeId, m_queue_timestamp_ );

                // If we have both available work and callbacks waiting to process it,
                // cancel both timers to enable immediate processing
                if ( HasAvailableWork() && !m_onSubTaskGrabbedCallbacks.empty() )
                {
                    m_dltGrabSubTaskTimeout.cancel();
                    m_dltQueueResponseTimeout.cancel();
                    m_logger->debug(
                        "CANCEL_ASYNC_TIMERS: {} is Canceling timers due to ownership acquisition and available work at {}ms",
                        m_localNodeId,
                        m_queue_timestamp_ );
                    ProcessPendingSubTaskGrabbing(); // Start processing immediately
                }

                PublishSubTaskQueue();
            }

            if ( hasOwnershipNow )
            {
                ProcessPendingSubTaskGrabbing();
            }
        }

        if ( m_subTaskQueueAssignmentEventSink )
        {
            if ( !queueInitialized && queueChanged )
            {
                std::vector<std::string> subTaskIds;
                subTaskIds.reserve( queue->subtasks().items_size() );
                for ( int subTaskIdx = 0; subTaskIdx < queue->subtasks().items_size(); ++subTaskIdx )
                {
                    subTaskIds.push_back( queue->subtasks().items( subTaskIdx ).subtaskid() );
                }
                m_subTaskQueueAssignmentEventSink( subTaskIds );
            }
        }

        return queueChanged;
    }

    bool ProcessingSubTaskQueueManager::ProcessSubTaskQueueRequestMessage(
        const SGProcessing::SubTaskQueueRequest &request )
    {
        std::lock_guard guard( m_queueMutex );

        auto requstingNodeId = request.node_id();
        m_logger->debug( "QUEUE_OWNERSHIP_REQUEST: node {} from node {} at {}ms",
                         m_localNodeId,
                         requstingNodeId,
                         m_queue_timestamp_ );

        // If we are the owner and there are still subtasks to be processed, we
        // can immediately transfer ownership, do so
        if ( HasOwnership() && !HasAvailableWork() )
        {
            if ( m_processingQueue.MoveOwnershipTo( requstingNodeId ) )
            {
                PublishSubTaskQueue();
                m_logger->debug( "QUEUE_OWNERSHIP_TRANSFERRED: from {} to {} at queue timestamp of {}ms",
                                 m_localNodeId,
                                 requstingNodeId,
                                 m_queue_timestamp_ );
                return true;
            }
        }

        // Otherwise, add to the shared ownership request queue
        if ( HasAvailableWork( false ) )
        {
            bool added = m_processingQueue.AddOwnershipRequest( requstingNodeId, request.request_timestamp() );
            if ( added )
            {
                m_logger->debug( "Added ownership request from node {} to queue", requstingNodeId );
                PublishSubTaskQueue(); // Publish updated queue with new request

                // Only start the timer if this is the first request in the queue
                int requestCount = m_queue->processing_queue().ownership_requests_size();
                if ( requestCount == 1 )
                {
                    m_dltQueueResponseTimeout.expires_from_now( m_queueResponseTimeout );
                    HandleQueueRequestTimeout( boost::system::error_code{} );
                }
            }
        }
        else
        {
            m_logger->debug( "No available work to process, not adding ownership request from node {}",
                             requstingNodeId );
        }

        return true;
    }

    void ProcessingSubTaskQueueManager::HandleQueueRequestTimeout( const boost::system::error_code &ec )
    {
        if ( ec != boost::asio::error::operation_aborted )
        {
            std::unique_lock guard( m_queueMutex );
            m_logger->debug( "QUEUE_REQUEST_TIMEOUT" );
            m_dltQueueResponseTimeout.expires_at( boost::posix_time::pos_infin );

            // If there's no available work, clear the ownership requests and exit
            if ( !HasAvailableWork( false ) )
            {
                // Clear ownership requests but remain the owner
                m_queue->mutable_processing_queue()->mutable_ownership_requests()->Clear();
                LogQueue();
                PublishSubTaskQueue();
                return;
            }

            // If we're the owner and there are pending requests, process immediately
            if ( HasOwnership() && m_queue->processing_queue().ownership_requests_size() > 0 )
            {
                if ( m_processingQueue.ProcessNextOwnershipRequest() )
                {
                    LogQueue();
                    PublishSubTaskQueue();
                    return; // Successfully processed a request, exit
                }
            }

            // Before attempting rollback, check if the current owner is responsive
            auto current_time = std::chrono::steady_clock::now();
            auto time_since_last_update =
                std::chrono::duration_cast<std::chrono::milliseconds>( current_time - m_lastQueueUpdateTime ).count();

            // Only attempt rollback if we haven't received updates for a while
            if ( time_since_last_update > m_queueResponseTimeout.total_milliseconds() &&
                 m_processingQueue.RollbackOwnership() )
            {
                // Record ownership acquisition time when rolling back ownership
                m_ownership_acquired_at_ = std::chrono::duration_cast<std::chrono::milliseconds>(
                                               std::chrono::steady_clock::now().time_since_epoch() )
                                               .count();
                m_queue_timestamp_                    = m_queue->processing_queue().last_update_timestamp();
                m_ownership_last_delta_time_          = m_ownership_acquired_at_;
                m_processedSubtasksInCurrentOwnership = 0; // Reset processed subtasks on new ownership
                LogQueue();
                PublishSubTaskQueue();

                if ( HasOwnership() )
                {
                    // Check if there are any pending ownership requests
                    if ( m_queue->processing_queue().ownership_requests_size() > 0 )
                    {
                        // If there are requests, try to process the next request instead of grabbing subtasks
                        if ( m_processingQueue.ProcessNextOwnershipRequest() )
                        {
                            LogQueue();
                            PublishSubTaskQueue();
                            return; // Exit without processing subtasks
                        }
                    }
                    guard.unlock();
                    ProcessPendingSubTaskGrabbing();
                    return;
                }
            }

            m_dltQueueResponseTimeout.async_wait( [instance = shared_from_this()]( const boost::system::error_code &ec )
                                                  { instance->HandleQueueRequestTimeout( ec ); } );
            // If it hasn't been long enough, schedule another timeout check, with shorter subsequent checks.
            m_dltQueueResponseTimeout.expires_from_now( boost::posix_time::milliseconds( 100 ) );
        }
    }

    std::unique_ptr<SGProcessing::SubTaskQueue> ProcessingSubTaskQueueManager::GetQueueSnapshot() const
    {
        auto queue = std::make_unique<SGProcessing::SubTaskQueue>();

        std::lock_guard guard( m_queueMutex );
        if ( m_queue )
        {
            queue->CopyFrom( *m_queue.get() );
        }
        return queue;
    }

    void ProcessingSubTaskQueueManager::ChangeSubTaskProcessingStates( const std::set<std::string> &subTaskIds,
                                                                       bool                         isProcessed )
    {
        if ( !m_queue )
        {
            m_logger->error( "No queue for task" );
            return;
        }

        std::lock_guard guard( m_queueMutex );
        for ( const auto &subTaskId : subTaskIds )
        {
            if ( isProcessed )
            {
                m_processedSubTaskIds.insert( subTaskId );
                // Add to queue's processed subtask IDs
                m_queue->mutable_processing_queue()->add_processed_subtask_ids( subTaskId );
                m_logger->trace( "Subtask flagged as processed {}", subTaskId );
            }
            else
            {
                m_processedSubTaskIds.erase( subTaskId );

                // For removal, clear and rebuild from our updated set
                auto *processed_ids = m_queue->mutable_processing_queue();
                processed_ids->clear_processed_subtask_ids();
                for ( const auto &id : m_processedSubTaskIds )
                {
                    processed_ids->add_processed_subtask_ids( id );
                }
                m_logger->trace( "Subtask flagged as UNPROCESSED {}", subTaskId );
            }
        }

        // Map subtask IDs to subtask indices
        std::vector<int> unprocessedSubTaskIndices = UpdateUnprocessedSubTaskIndices( m_queue.get() );
        m_processingQueue.UpdateQueue( m_queue->mutable_processing_queue(), unprocessedSubTaskIndices );

        // Add this at the end of the method
        if ( HasOwnership() )
        {
            PublishSubTaskQueue();
        }
    }

    bool ProcessingSubTaskQueueManager::IsProcessed() const
    {
        std::lock_guard guard( m_queueMutex );
        if ( !m_queue )
        {
            m_logger->error( "CHECK_IS_PROCESSED Queue is null: {} for node {}",
                             m_processedSubTaskIds.size(),
                             m_processedSubTaskIds.size(),
                             m_localNodeId );
            return false;
        }
        // The queue can contain only valid results
        m_logger->debug( "CHECK_IS_PROCESSED: {} of {} for node {}",
                         m_processedSubTaskIds.size(),
                         m_queue->subtasks().items_size(),
                         m_localNodeId );
        return m_processedSubTaskIds.size() >= (size_t)m_queue->subtasks().items_size();
    }

    void ProcessingSubTaskQueueManager::SetSubTaskQueueAssignmentEventSink(
        std::function<void( const std::vector<std::string> & )> subTaskQueueAssignmentEventSink )
    {
        m_subTaskQueueAssignmentEventSink = std::move( subTaskQueueAssignmentEventSink );
        if ( m_subTaskQueueAssignmentEventSink )
        {
            std::unique_lock guard( m_queueMutex );
            if ( m_queue != nullptr )
            {
                std::vector<std::string> subTaskIds;
                subTaskIds.reserve( m_queue->subtasks().items_size() );
                for ( int subTaskIdx = 0; subTaskIdx < m_queue->subtasks().items_size(); ++subTaskIdx )
                {
                    subTaskIds.push_back( m_queue->subtasks().items( subTaskIdx ).subtaskid() );
                }
                guard.unlock();
                m_subTaskQueueAssignmentEventSink( subTaskIds );
            }
        }
    }

    void ProcessingSubTaskQueueManager::LogQueue() const
    {
        if ( m_logger->level() <= spdlog::level::trace )
        {
            std::stringstream ss;
            ss << "{";
            ss << "\"this\":\"" << reinterpret_cast<size_t>( this ) << "\"";
            ss << "\"owner_node_id\":\"" << m_queue->processing_queue().owner_node_id() << "\"";
            ss << "," << "\"last_update_timestamp\":" << m_queue->processing_queue().last_update_timestamp();
            ss << "," << "\"items\":[";
            for ( int itemIdx = 0; itemIdx < m_queue->processing_queue().items_size(); ++itemIdx )
            {
                auto item = m_queue->processing_queue().items( itemIdx );
                ss << "{\"lock_node_id\":\"" << item.lock_node_id() << "\"";
                ss << ",\"lock_timestamp\":" << item.lock_timestamp() << "},";
            }
            ss << "]}";

            m_logger->trace( ss.str() );
        }
    }

    bool ProcessingSubTaskQueueManager::HasAvailableWork( bool checkOwnershipQuota ) const
    {
        // Check if we've already processed the maximum allowed subtasks per ownership
        if ( checkOwnershipQuota && ( m_processedSubtasksInCurrentOwnership >= m_maxSubtasksPerOwnership ) )
        {
            return false;
        }

        if ( !m_queue )
        {
            m_logger->error( "No queue for check of available work" );
            return false;
        }

        // Use the helper method to check if any subtasks are available
        std::vector<int> unprocessedSubTaskIndices = UpdateUnprocessedSubTaskIndices( m_queue.get() );
        return !unprocessedSubTaskIndices.empty();
    }

    void ProcessingSubTaskQueueManager::UpdateQueueTimestamp()
    {
        if ( HasOwnership() )
        {
            auto current_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                       std::chrono::steady_clock::now().time_since_epoch() )
                                       .count();
            auto ownership_duration_delta_ms = current_time_ms - m_ownership_last_delta_time_;

            // Update the queue's total time counter
            m_queue_timestamp_ += ownership_duration_delta_ms;

            // Reset ownership acquisition time to now
            m_ownership_last_delta_time_ = current_time_ms;

            // Update the queue's timestamp if it exists
            if ( m_queue )
            {
                m_queue->mutable_processing_queue()->set_last_update_timestamp( m_queue_timestamp_ );
            }
        }
    }

    void ProcessingSubTaskQueueManager::CheckActiveCount()
    {
        auto now      = std::chrono::steady_clock::now();
        auto duration = static_cast<uint64_t>(
            std::chrono::duration_cast<std::chrono::milliseconds>( now - m_lastActiveCountCheck ).count() );
        auto activeNodeCount = m_queueChannel->GetActiveNodesCount();
        m_logger->info( "Active count is {} Duration is {}", activeNodeCount, duration );
        if ( activeNodeCount > 1 || ( m_queue->processing_queue().ownership_requests_size() > 0 ) )
        {
            // Limit processing to just one subtask
            m_maxSubtasksPerOwnership = m_defaultMaxSubtasksPerOwnership;
            m_waitTimeBeforeReset     = 100;  // Short wait if other nodes are active
            m_initialDelayPassed      = true; // Consider initial delay passed when other nodes appear
        }
        else
        {
            // Check if enough time has passed to reset
            if ( duration >= m_waitTimeBeforeReset )
            {
                // Reset processed subtasks and prepare for more processing
                m_processedSubtasksInCurrentOwnership = 0;
                m_maxSubtasksPerOwnership             = m_defaultMaxSubtasksPerOwnership;

                // After initial delay, use shorter wait time
                if ( !m_initialDelayPassed )
                {
                    m_waitTimeBeforeReset = 100; // Switch to shorter delay after initial wait
                    m_initialDelayPassed  = true;
                }
            }
        }
        // Update last check time
        m_lastActiveCountCheck = now;
    }

    uint64_t ProcessingSubTaskQueueManager::GetCurrentQueueTimestamp()
    {
        // Update and return the current queue timestamp
        UpdateQueueTimestamp();
        return m_queue_timestamp_;
    }

    uint64_t ProcessingSubTaskQueueManager::CalculateGrabSubTaskTimeout() const
    {
        auto lastLockTimestamp = m_processingQueue.GetLastLockTimestamp(); // Queue time base
        auto currentQueueTime  = m_queue_timestamp_;                       // Queue time base
        auto timeoutMs         = std::chrono::duration_cast<std::chrono::milliseconds>( m_processingTimeout ).count();

        // Calculate when the lock expires in queue time
        uint64_t lockExpirationTime = lastLockTimestamp + timeoutMs;

        // Calculate time until expiration in queue time
        uint64_t grabSubTaskTimeoutMs = 1;
        if ( lockExpirationTime > currentQueueTime )
        {
            grabSubTaskTimeoutMs = lockExpirationTime - currentQueueTime;
            // Cap at a reasonable maximum (e.g., 15 seconds)
            grabSubTaskTimeoutMs = std::min( grabSubTaskTimeoutMs, static_cast<uint64_t>( m_waitTimeBeforeReset ) );
        }
        else
        {
            //Use 100ms min so we're not slamming with 1ms timeouts that cause immediate failure as the lock is expired.
            grabSubTaskTimeoutMs = std::min( static_cast<uint64_t>( 100 ),
                                             static_cast<uint64_t>( m_waitTimeBeforeReset ) );
        }

        m_logger->trace( "calculated GRAB_TIMEOUT {}ms", grabSubTaskTimeoutMs );
        return grabSubTaskTimeoutMs;
    }

    std::vector<int> ProcessingSubTaskQueueManager::UpdateUnprocessedSubTaskIndices(
        const SGProcessing::SubTaskQueue *queue ) const
    {
        std::vector<int> unprocessedSubTaskIndices;

        for ( int subTaskIdx = 0; subTaskIdx < queue->subtasks().items_size(); ++subTaskIdx )
        {
            const auto &subTaskId      = queue->subtasks().items( subTaskIdx ).subtaskid();
            const auto &processingItem = queue->processing_queue().items( subTaskIdx );

            // Check if subtask is not processed
            bool isUnprocessed = m_processedSubTaskIds.find( subTaskId ) == m_processedSubTaskIds.end();

            // Check if subtask is not locked or its lock has expired
            bool isUnlocked    = processingItem.lock_node_id().empty();
            bool isLockExpired = false;

            if ( !isUnlocked )
            {
                // Lock has expired if current queue timestamp is greater than or equal to lock timestamp
                isLockExpired = m_queue_timestamp_ > processingItem.lock_timestamp();
            }

            // Subtask is available if it's not processed AND (not locked OR lock has expired)
            if ( isUnprocessed && ( isUnlocked || isLockExpired ) )
            {
                unprocessedSubTaskIndices.push_back( subTaskIdx );
            }
        }

        return unprocessedSubTaskIndices;
    }

}

Updated on 2026-03-04 at 13:10:44 -0800