src/processing/impl/processing_task_queue_impl.hpp
Namespaces
Classes
Source code
#ifndef GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_IMPL_HPP
#define GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_IMPL_HPP
#include <utility>
#include <boost/format.hpp>
#include "outcome/outcome.hpp"
#include "processing/processing_task_queue.hpp"
#include "crdt/globaldb/globaldb.hpp"
#include "crdt/atomic_transaction.hpp"
namespace sgns::processing
{
class ProcessingTaskQueueImpl : public ProcessingTaskQueue
{
public:
ProcessingTaskQueueImpl( std::shared_ptr<sgns::crdt::GlobalDB> db, std::string processing_topic );
~ProcessingTaskQueueImpl();
outcome::result<void> EnqueueTask( const SGProcessing::Task &task,
const std::list<SGProcessing::SubTask> &subTasks ) override;
bool GetSubTasks( const std::string &taskId, std::list<SGProcessing::SubTask> &subTasks ) override;
outcome::result<std::pair<std::string, SGProcessing::Task>> GrabTask() override;
outcome::result<std::shared_ptr<crdt::AtomicTransaction>> CompleteTask(
const std::string &taskKey,
const SGProcessing::TaskResult &taskResult ) override;
bool IsTaskCompleted( const std::string &taskId ) override;
outcome::result<void> IsTaskValid( const std::string taskJson );
outcome::result<void> IsSubTaskValid( const std::string taskJson );
outcome::result<std::string> GetTaskEscrow( const std::string &taskId );
bool IsTaskLocked( const std::string &taskKey );
bool LockTask( const std::string &taskKey );
bool MoveExpiredTaskLock( const std::string &taskKey, SGProcessing::Task &task );
outcome::result<void> SendEscrow( std::string path, sgns::base::Buffer value );
void ResetAtomicTransaction();
void MarkTaskBad( const std::string& taskKey ) override;
private:
std::shared_ptr<sgns::crdt::GlobalDB> m_db;
std::chrono::system_clock::duration m_processingTimeout;
sgns::base::Logger m_logger = sgns::base::createLogger( "ProcessingTaskQueueImpl" );
std::shared_ptr<sgns::crdt::AtomicTransaction> job_crdt_transaction_;
std::string m_processing_topic;
std::set<std::string> m_badjobs;
static constexpr std::string_view TASK_LIST_KEY = "/tasks";
static constexpr std::string_view SUBTASK_LIST_KEY = "/subtasks";
static constexpr std::string_view TASK_KEY = "/TASK_%s";
static constexpr std::string_view SUBTASK_KEY = "/%s";
static constexpr std::string_view RESULTS_KEY = "/task_results";
static constexpr std::string_view LOCK_KEY = "/lock_%s";
};
}
#endif // GRPC_FOR_SUPERGENIUS_PROCESSING_TASK_QUEUE_HPP
Updated on 2026-03-04 at 13:10:44 -0800