impl/TaskQueueImpl.hpp¶
Header file for the implementation of the task queue using CRDT. More...
Namespaces¶
| Name |
|---|
| sgns |
| sgns::processing |
Classes¶
| Name | |
|---|---|
| class | sgns::processing::TaskQueueImpl Implements the task storage on CRDT. |
Detailed Description¶
Header file for the implementation of the task queue using CRDT.
Date: 2026-05-18 Henrique A. Klein ([email protected])
Source code¶
#pragma once
#include <unordered_set>
#include <string>
#include <utility>
#include <memory>
#include "processing/processing_task_queue.hpp"
#include "crdt/globaldb/globaldb.hpp"
#include "crdt/atomic_transaction.hpp"
#include "processing/impl/TaskKeys.hpp"
#include "outcome/outcome.hpp"
namespace sgns::processing
{
class TaskQueueImpl : public ProcessingTaskQueue
{
public:
static std::shared_ptr<TaskQueueImpl> New( std::shared_ptr<sgns::crdt::GlobalDB> db,
std::string processing_topic );
~TaskQueueImpl() override = default;
outcome::result<void> EnqueueTask(
const SGProcessing::Task &task,
const std::list<SGProcessing::SubTask> &subTasks,
std::shared_ptr<crdt::AtomicTransaction> crdt_transaction = nullptr ) override;
outcome::result<SGProcessing::Task> GetTask( const std::string &taskId ) override;
bool GetSubTasks( const std::string &taskId, std::list<SGProcessing::SubTask> &subTasks ) override;
outcome::result<std::pair<std::string, SGProcessing::Task>> GrabTask() override;
outcome::result<std::shared_ptr<crdt::AtomicTransaction>> CompleteTask(
const std::string &taskKey,
const SGProcessing::TaskResult &taskResult ) override;
bool IsTaskCompleted( const std::string &taskId ) override;
void MarkTaskBad( const std::string &taskKey ) override;
private:
static constexpr auto LOCK_TIMEOUT = std::chrono::seconds( 10 );
explicit TaskQueueImpl( std::shared_ptr<sgns::crdt::GlobalDB> db, std::string processing_topic );
bool IsTaskLocked( const std::string &taskKey );
bool LockTask( const std::string &taskKey );
bool MoveExpiredTaskLock( const std::string &taskKey, SGProcessing::Task &task );
std::shared_ptr<sgns::crdt::GlobalDB> db_;
std::string processing_topic_;
std::unordered_set<std::string> incompatible_jobs_;
};
}
Updated on 2026-06-05 at 17:22:19 -0700