Skip to content

src/processing/impl/processing_task_queue_impl.cpp

Namespaces

Name
sgns
sgns::processing

Source code

#include "processing/impl/processing_task_queue_impl.hpp"
#include <processingbase/ProcessingManager.hpp>
#include <Generators.hpp>

namespace sgns::processing
{
    ProcessingTaskQueueImpl::ProcessingTaskQueueImpl( std::shared_ptr<sgns::crdt::GlobalDB> db, std::string processing_topic ) :
        m_db( std::move( db ) ),
        m_processingTimeout( std::chrono::seconds( 10 ) ),
        m_processing_topic( std::move( processing_topic ) ),
        m_badjobs()
    {
        m_logger->info( "ProcessingTaskQueueImpl CREATED - instance at {}", static_cast<void*>(this) );
    }

    ProcessingTaskQueueImpl::~ProcessingTaskQueueImpl()
    {
        m_logger->info( "ProcessingTaskQueueImpl DESTROYED - instance at {}", static_cast<void*>(this) );
    }
    outcome::result<void> ProcessingTaskQueueImpl::EnqueueTask( const SGProcessing::Task               &task,
                                                                const std::list<SGProcessing::SubTask> &subTasks )
    {
        if ( job_crdt_transaction_ )
        {
            //escrow needs to be sent/commited
            return outcome::failure( boost::system::error_code{} );
        }

        job_crdt_transaction_ = m_db->BeginTransaction();
        std::vector<crdt::GlobalDB::DataPair> data_vector;

        for ( auto &subTask : subTasks )
        {
            boost::format complete_subtask_path{ std::string( SUBTASK_LIST_KEY ) + std::string( TASK_KEY ) +
                                                 std::string( SUBTASK_KEY ) };
            complete_subtask_path % task.ipfs_block_id() % subTask.subtaskid();

            sgns::crdt::HierarchicalKey key( complete_subtask_path.str() );
            sgns::base::Buffer          value;
            value.put( subTask.SerializeAsString() );
            BOOST_OUTCOME_TRYV2( auto &&, job_crdt_transaction_->Put( std::move( key ), std::move( value ) ) );

            m_logger->debug( "[{}] placed to GlobalDB ", complete_subtask_path.str() );
        }
        boost::format complete_task_path{ std::string( TASK_LIST_KEY ) + std::string( TASK_KEY ) };
        complete_task_path % task.ipfs_block_id();

        sgns::crdt::HierarchicalKey key( complete_task_path.str() );
        sgns::base::Buffer          value;
        value.put( task.SerializeAsString() );

        BOOST_OUTCOME_TRYV2( auto &&, job_crdt_transaction_->Put( std::move( key ), std::move( value ) ) );
        m_logger->debug( "[{}] placed to GlobalDB ", complete_task_path.str() );

        return outcome::success();
    }

    bool ProcessingTaskQueueImpl::GetSubTasks( const std::string &taskId, std::list<SGProcessing::SubTask> &subTasks )
    {
        m_logger->debug( "SUBTASKS_REQUESTED. TaskId: {}", taskId );
        boost::format complete_subtask_list_path{ std::string( SUBTASK_LIST_KEY ) + std::string( TASK_KEY ) };

        complete_subtask_list_path % taskId;
        auto querySubTasks = m_db->QueryKeyValues( complete_subtask_list_path.str() );

        if ( querySubTasks.has_failure() )
        {
            m_logger->info( "Unable list subtasks from CRDT datastore" );
            return false;
        }

        if ( querySubTasks.has_value() )
        {
            m_logger->debug( "SUBTASKS_FOUND {}", querySubTasks.value().size() );

            for ( auto element : querySubTasks.value() )
            {
                SGProcessing::SubTask subTask;
                if ( subTask.ParseFromArray( element.second.data(), element.second.size() ) )
                {
                    m_logger->debug( "Subtask check {}", subTask.chunkstoprocess_size() );
                    if (!IsSubTaskValid(subTask.json_data()))
                    {
                        m_logger->debug( "Subtask does not validate" );
                        return false;
                    }
                    subTasks.push_back( std::move( subTask ) );
                }
                else
                {
                    m_logger->debug( "Unable to parse a subtask" );
                    return false;
                }
            }

            return true;
        }

        m_logger->debug( "NO_SUBTASKS_FOUND. TaskId {}", taskId );
        return false;
    }

    outcome::result<std::pair<std::string, SGProcessing::Task>> ProcessingTaskQueueImpl::GrabTask()
    {
        m_logger->info( "GRAB_TASK called - blacklist has {} items", m_badjobs.size() );
        OUTCOME_TRY( ( auto &&, queryTasks ), m_db->QueryKeyValues( std::string( TASK_LIST_KEY ) ) );

        //m_logger->info( "Task list grabbed from CRDT datastore" );

        bool                  task_grabbed = false;
        std::set<std::string> lockedTasks;
        SGProcessing::Task    task;
        m_logger->info( "Number of tasks in Queue: {}", queryTasks.size() );
        for ( auto element : queryTasks )
        {
            auto taskKey = m_db->KeyToString( element.first );
            if ( !taskKey.has_value() )
            {
                m_logger->debug( "Unable to convert a key to string" );
                continue;
            }

            if ( !task.ParseFromArray( element.second.data(), element.second.size() ) )
            {
                m_logger->debug( "Couldn't parse the task from Protobuf" );
                //TODO - Decide what to do with an invalid task - Maybe error?
                continue;
            }

            m_logger->info( "Checking task: ipfs_block_id='{}', taskKey='{}'", task.ipfs_block_id(), taskKey.value() );

            if ( m_badjobs.find( task.ipfs_block_id() ) != m_badjobs.end() )
            {
                m_logger->debug( "Skip bad job: {} (found in blacklist of {} items)", task.ipfs_block_id(), m_badjobs.size() );
                continue;
            }
            else
            {
                m_logger->debug( "Task {} not in blacklist (blacklist has {} items)", task.ipfs_block_id(), m_badjobs.size() );
            }

            if ( IsTaskCompleted( task.ipfs_block_id() ) )
            {
                m_logger->debug( "Task already processed" );
                continue;
            }

            if ( IsTaskLocked( taskKey.value() ) )
            {
                m_logger->debug( "TASK_PREVIOUSLY_LOCKED {}", taskKey.value() );
                lockedTasks.insert( taskKey.value() );
                continue;
            }
            m_logger->debug( "TASK_QUEUE_ITEM: {}, LOCKED: true", taskKey.value() );

            if ( !LockTask( taskKey.value() ) )
            {
                m_logger->debug( "Failed to lock task" );
                continue;
            }
            m_logger->debug( "TASK_LOCKED {}", taskKey.value() );
            task_grabbed = true;
            break;
        }

        // No task was grabbed so far
        for ( auto lockedTask : lockedTasks )
        {
            if ( MoveExpiredTaskLock( lockedTask, task ) )
            {
                task_grabbed = true;
                break;
            }
        }
        m_logger->info( "Checked task Queue: {} tasks processed, {} bad tasks in blacklist", queryTasks.size(), m_badjobs.size() );
        if ( task_grabbed )
        {
            m_logger->info( "GRAB_TASK_SUCCESS: returning task_id={}", task.ipfs_block_id() );
            return std::make_pair( task.ipfs_block_id(), task );
        }

        m_logger->info( "GRAB_TASK_FAILED: no tasks available" );
        return outcome::failure( boost::system::error_code{} );
    }

    outcome::result<std::shared_ptr<crdt::AtomicTransaction>> ProcessingTaskQueueImpl::CompleteTask(
        const std::string              &taskKey,
        const SGProcessing::TaskResult &taskResult )
    {
        sgns::base::Buffer data;
        boost::format      complete_result_path{ std::string( RESULTS_KEY ) + std::string( TASK_LIST_KEY ) +
                                            std::string( TASK_KEY ) };
        complete_result_path % taskKey;
        sgns::crdt::HierarchicalKey result_key( complete_result_path.str() );

        m_logger->debug( "CompleteTask: Completing task on {}", result_key.GetKey() );

        auto job_completion_transaction = m_db->BeginTransaction();
        data.put( taskResult.SerializeAsString() );
        BOOST_OUTCOME_TRYV2( auto &&, job_completion_transaction->Put( std::move( result_key ), std::move( data ) ) );

        m_logger->debug( "TASK_COMPLETED: {}, results stored", taskKey );
        return job_completion_transaction;
    }

    bool ProcessingTaskQueueImpl::IsTaskCompleted( const std::string &taskId )
    {
        boost::format complete_result_path{ std::string( RESULTS_KEY ) + std::string( TASK_LIST_KEY ) +
                                            std::string( TASK_KEY ) };
        complete_result_path % taskId;
        sgns::crdt::HierarchicalKey result_key( complete_result_path.str() );
        auto                        has_result = m_db->Get( result_key );

        return has_result.has_value();
    }

    outcome::result<std::string> ProcessingTaskQueueImpl::GetTaskEscrow( const std::string &taskId )
    {
        boost::format complete_task_path{ std::string( TASK_LIST_KEY ) + std::string( TASK_KEY ) };
        complete_task_path % taskId;
        sgns::crdt::HierarchicalKey task_key( complete_task_path.str() );

        OUTCOME_TRY( ( auto &&, task_buffer ), m_db->Get( task_key ) );

        SGProcessing::Task task;

        if ( !task.ParseFromArray( task_buffer.data(), task_buffer.size() ) )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        return task.escrow_path();
    }

    bool ProcessingTaskQueueImpl::IsTaskLocked( const std::string &taskKey )
    {
        boost::format complete_lock_path{ std::string( LOCK_KEY ) };
        complete_lock_path % taskKey;
        sgns::crdt::HierarchicalKey lock_key( complete_lock_path.str() );
        auto                        lockData = m_db->Get( lock_key );
        return !lockData.has_failure() && lockData.has_value();
    }

    outcome::result<void> ProcessingTaskQueueImpl::IsTaskValid( const std::string taskJson )
    {
        OUTCOME_TRY( auto procmgr, sgns::sgprocessing::ProcessingManager::Create( taskJson ) );
        return outcome::success();
    }

    outcome::result<void> ProcessingTaskQueueImpl::IsSubTaskValid( const std::string taskJson )
    {
        sgns::ModelNode model;
        try
        {
            m_logger->trace( "SubTask Parsing {}", taskJson );
            auto data = nlohmann::json::parse( taskJson );
            sgns::from_json( data, model );
        }
        catch ( const nlohmann::json::exception &e )
        {
            m_logger->debug( "SubTask Parsing Failed {} ", e.what() );
            return outcome::failure( boost::system::error_code{} );
        }
        return outcome::success();
    }

    bool ProcessingTaskQueueImpl::LockTask( const std::string &taskKey )
    {
        auto timestamp = std::chrono::system_clock::now();

        boost::format complete_lock_path{ std::string( LOCK_KEY ) };
        complete_lock_path % taskKey;
        sgns::crdt::HierarchicalKey lock_key( complete_lock_path.str() );

        SGProcessing::TaskLock lock;
        lock.set_task_id( taskKey );
        lock.set_lock_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( timestamp.time_since_epoch() ).count() );

        sgns::base::Buffer lockData;
        lockData.put( lock.SerializeAsString() );

        auto res = m_db->Put( lock_key, lockData, { m_processing_topic } );
        return !res.has_failure();
    }

    bool ProcessingTaskQueueImpl::MoveExpiredTaskLock( const std::string &taskKey, SGProcessing::Task &task )
    {
        auto timestamp = std::chrono::system_clock::now();

        boost::format complete_lock_path{ std::string( LOCK_KEY ) };
        complete_lock_path % taskKey;
        sgns::crdt::HierarchicalKey lock_key( complete_lock_path.str() );

        auto lockData = m_db->Get( lock_key );
        if ( !lockData.has_failure() && lockData.has_value() )
        {
            // Check task expiration
            SGProcessing::TaskLock lock;
            if ( lock.ParseFromArray( lockData.value().data(), lockData.value().size() ) )
            {
                // Convert the stored milliseconds back to a time_point
                auto lockTimePoint = std::chrono::system_clock::time_point(
                    std::chrono::milliseconds( lock.lock_timestamp() ) );

                auto expirationTime = lockTimePoint + m_processingTimeout;

                if ( timestamp > expirationTime )
                {
                    auto taskData = m_db->Get( taskKey );

                    if ( !taskData.has_failure() )
                    {
                        if ( task.ParseFromArray( taskData.value().data(), taskData.value().size() ) )
                        {
                            // Check if this task is blacklisted before allowing it to be processed again
                            if ( m_badjobs.find( task.ipfs_block_id() ) != m_badjobs.end() )
                            {
                                m_logger->debug( "Skip expired bad job: {} (found in blacklist of {} items)", task.ipfs_block_id(), m_badjobs.size() );
                                return false;
                            }
                            else
                            {
                                m_logger->debug( "Expired task {} not in blacklist (blacklist has {} items)", task.ipfs_block_id(), m_badjobs.size() );
                            }

                            if ( LockTask( taskKey ) )
                            {
                                m_logger->debug( "TASK_LOCK_MOVED {}", taskKey );
                                return true;
                            }
                        }
                    }
                    else
                    {
                        m_logger->debug( "Unable to find a task {}", taskKey );
                    }
                }
            }
        }
        return false;
    }

    outcome::result<void> ProcessingTaskQueueImpl::SendEscrow( std::string path, sgns::base::Buffer value )
    {
        if ( !job_crdt_transaction_ )
        {
            //task and subtasks need to be enqueued
            return outcome::failure( boost::system::error_code{} );
        }

        sgns::crdt::HierarchicalKey key( path );

        BOOST_OUTCOME_TRYV2( auto &&, job_crdt_transaction_->Put( std::move( key ), std::move( value ) ) );
        BOOST_OUTCOME_TRYV2( auto &&, job_crdt_transaction_->Commit( { m_processing_topic } ) );

        ResetAtomicTransaction();

        return outcome::success();
    }

    void ProcessingTaskQueueImpl::ResetAtomicTransaction()
    {
        job_crdt_transaction_.reset();
    }

    void ProcessingTaskQueueImpl::MarkTaskBad( const std::string &taskKey )
    {
        m_logger->info( "MARKING_TASK_BAD: {} (total bad jobs: {}) - instance at {}", taskKey, m_badjobs.size(), static_cast<void*>(this) );
        m_badjobs.insert( taskKey );
        m_logger->info( "MARKED_TASK_BAD: {} (total bad jobs now: {}) - instance at {}", taskKey, m_badjobs.size(), static_cast<void*>(this) );

        // Dump current blacklist for debugging
        if ( m_badjobs.size() <= 10 ) // Only dump if list is small
        {
            std::string blacklist;
            for ( const auto& bad_task : m_badjobs )
            {
                if ( !blacklist.empty() ) blacklist += ", ";
                blacklist += bad_task;
            }
            m_logger->info( "Current blacklist: [{}]", blacklist );
        }
    }
}

Updated on 2026-03-04 at 13:10:44 -0800