Skip to content

sgns::processing::TaskQueueImpl

Implements the task storage on CRDT.

#include <TaskQueueImpl.hpp>

Inherits from sgns::processing::ProcessingTaskQueue

Public Functions

Name
std::shared_ptr< TaskQueueImpl > New(std::shared_ptr< sgns::crdt::GlobalDB > db, std::string processing_topic)
Factory method to create a TaskQueueImpl instance.
~TaskQueueImpl() override =default
virtual outcome::result< void > EnqueueTask(const SGProcessing::Task & task, const std::list< SGProcessing::SubTask > & subTasks, std::shared_ptr< crdt::AtomicTransaction > crdt_transaction =nullptr) override
Stores a task with its subtasks within an atomic transaction.
virtual outcome::result< SGProcessing::Task > GetTask(const std::string & taskId) override
Returns a task by task id, returns failure if task not found or invalid.
virtual bool GetSubTasks(const std::string & taskId, std::list< SGProcessing::SubTask > & subTasks) override
Retrieves the subtasks for a given task ID.
virtual outcome::result< std::pair< std::string, SGProcessing::Task > > GrabTask() override
Grabs task from the storage, returning its ID and data.
virtual outcome::result< std::shared_ptr< crdt::AtomicTransaction > > CompleteTask(const std::string & taskId, const SGProcessing::TaskResult & result) override
Completes a task with its result returning an atomic transaction to commit the completion.
virtual bool IsTaskCompleted(const std::string & taskId) override
Checks if the task is completed.
virtual void MarkTaskBad(const std::string & taskKey) override
Mark a task key as bad to be skipped.

Additional inherited members

Public Functions inherited from sgns::processing::ProcessingTaskQueue

Name
virtual ~ProcessingTaskQueue() =default
Distributed task queue interface.

Public Functions Documentation

function New

static std::shared_ptr< TaskQueueImpl > New(
    std::shared_ptr< sgns::crdt::GlobalDB > db,
    std::string processing_topic
)

Factory method to create a TaskQueueImpl instance.

Parameters:

  • db The database instance to use for storing tasks and subtasks
  • processing_topic The topic for processing tasks

Return: Instance of the TaskQueueImpl initialized or nullptr if error occurs

function ~TaskQueueImpl

~TaskQueueImpl() override =default

function EnqueueTask

virtual outcome::result< void > EnqueueTask(
    const SGProcessing::Task & task,
    const std::list< SGProcessing::SubTask > & subTasks,
    std::shared_ptr< crdt::AtomicTransaction > crdt_transaction =nullptr
) override

Stores a task with its subtasks within an atomic transaction.

Parameters:

  • task The task to store
  • subTasks The subtasks to store
  • crdt_transaction The atomic transaction to use

Return: Success if the task and subtasks were stored successfully, failure otherwise

Reimplements: sgns::processing::ProcessingTaskQueue::EnqueueTask

function GetTask

virtual outcome::result< SGProcessing::Task > GetTask(
    const std::string & taskId
) override

Returns a task by task id, returns failure if task not found or invalid.

Parameters:

  • taskId the ID of the task

Return: The task if found, failure otherwise

Reimplements: sgns::processing::ProcessingTaskQueue::GetTask

function GetSubTasks

virtual bool GetSubTasks(
    const std::string & taskId,
    std::list< SGProcessing::SubTask > & subTasks
) override

Retrieves the subtasks for a given task ID.

Parameters:

  • taskId The ID of the task for which to retrieve subtasks.
  • subTasks A reference to a list where the retrieved subtasks will be stored.

Return: true if the subtasks were retrieved successfully, false otherwise.

Reimplements: sgns::processing::ProcessingTaskQueue::GetSubTasks

function GrabTask

virtual outcome::result< std::pair< std::string, SGProcessing::Task > > GrabTask() override

Grabs task from the storage, returning its ID and data.

Return: A pair of task ID and task data if a task is found, failure otherwise.

Reimplements: sgns::processing::ProcessingTaskQueue::GrabTask

function CompleteTask

virtual outcome::result< std::shared_ptr< crdt::AtomicTransaction > > CompleteTask(
    const std::string & taskId,
    const SGProcessing::TaskResult & result
) override

Completes a task with its result returning an atomic transaction to commit the completion.

Parameters:

  • taskId The ID of the task to complete
  • result The result of the completed task

Return: A CRDT atomic transaction if the task completion was successful, failure otherwise

Reimplements: sgns::processing::ProcessingTaskQueue::CompleteTask

function IsTaskCompleted

virtual bool IsTaskCompleted(
    const std::string & taskId
) override

Checks if the task is completed.

Parameters:

  • taskId Task id.

Return: true if the task is completed, false otherwise.

Reimplements: sgns::processing::ProcessingTaskQueue::IsTaskCompleted

function MarkTaskBad

virtual void MarkTaskBad(
    const std::string & taskKey
) override

Mark a task key as bad to be skipped.

Parameters:

  • taskKey Task key to mark as bad.

Reimplements: sgns::processing::ProcessingTaskQueue::MarkTaskBad


Updated on 2026-06-05 at 17:22:18 -0700