Skip to content

src/processing/impl/processing_task_queue_impl.hpp

Namespaces

Name
sgns
sgns::processing

Classes

Name
class sgns::processing::ProcessingTaskQueueImpl

Source code

#ifndef GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_IMPL_HPP
#define GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_IMPL_HPP

#include <utility>

#include <boost/format.hpp>

#include "outcome/outcome.hpp"
#include "processing/processing_task_queue.hpp"
#include "crdt/globaldb/globaldb.hpp"
#include "crdt/atomic_transaction.hpp"

namespace sgns::processing
{
    class ProcessingTaskQueueImpl : public ProcessingTaskQueue
    {
    public:
        ProcessingTaskQueueImpl( std::shared_ptr<sgns::crdt::GlobalDB> db, std::string processing_topic );

        ~ProcessingTaskQueueImpl();

        outcome::result<void> EnqueueTask( const SGProcessing::Task               &task,
                                           const std::list<SGProcessing::SubTask> &subTasks ) override;

        bool GetSubTasks( const std::string &taskId, std::list<SGProcessing::SubTask> &subTasks ) override;

        outcome::result<std::pair<std::string, SGProcessing::Task>> GrabTask() override;

        outcome::result<std::shared_ptr<crdt::AtomicTransaction>> CompleteTask(
            const std::string              &taskKey,
            const SGProcessing::TaskResult &taskResult ) override;

        bool IsTaskCompleted( const std::string &taskId ) override;

        outcome::result<void> IsTaskValid( const std::string taskJson );

        outcome::result<void> IsSubTaskValid( const std::string taskJson );

        outcome::result<std::string> GetTaskEscrow( const std::string &taskId );

        bool IsTaskLocked( const std::string &taskKey );

        bool LockTask( const std::string &taskKey );

        bool MoveExpiredTaskLock( const std::string &taskKey, SGProcessing::Task &task );

        outcome::result<void> SendEscrow( std::string path, sgns::base::Buffer value );
        void                  ResetAtomicTransaction();

        void MarkTaskBad( const std::string& taskKey ) override;

    private:
        std::shared_ptr<sgns::crdt::GlobalDB>          m_db;
        std::chrono::system_clock::duration            m_processingTimeout;
        sgns::base::Logger                             m_logger = sgns::base::createLogger( "ProcessingTaskQueueImpl" );
        std::shared_ptr<sgns::crdt::AtomicTransaction> job_crdt_transaction_;
        std::string                                    m_processing_topic;
        std::set<std::string>                          m_badjobs;

        static constexpr std::string_view TASK_LIST_KEY    = "/tasks";
        static constexpr std::string_view SUBTASK_LIST_KEY = "/subtasks";
        static constexpr std::string_view TASK_KEY         = "/TASK_%s";
        static constexpr std::string_view SUBTASK_KEY      = "/%s";
        static constexpr std::string_view RESULTS_KEY      = "/task_results";
        static constexpr std::string_view LOCK_KEY         = "/lock_%s";
    };

}

#endif // GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_HPP

Updated on 2026-03-04 at 13:10:44 -0800