Skip to content

sgns::processing::ProcessingTaskQueueImpl

#include <processing_task_queue_impl.hpp>

Inherits from ProcessingTaskQueue

Public Functions

Name
ProcessingTaskQueueImpl(std::shared_ptr< sgns::crdt::GlobalDB > db, std::string processing_topic)
~ProcessingTaskQueueImpl()
virtual outcome::result< void > EnqueueTask(const SGProcessing::Task & task, const std::list< SGProcessing::SubTask > & subTasks) override
virtual bool GetSubTasks(const std::string & taskId, std::list< SGProcessing::SubTask > & subTasks) override
virtual outcome::result< std::pair< std::string, SGProcessing::Task > > GrabTask() override
virtual outcome::result< std::shared_ptr< crdt::AtomicTransaction > > CompleteTask(const std::string & taskKey, const SGProcessing::TaskResult & taskResult) override
virtual bool IsTaskCompleted(const std::string & taskId) override
Checks whether a task is completed.
outcome::result< void > IsTaskValid(const std::string taskJson)
outcome::result< void > IsSubTaskValid(const std::string taskJson)
outcome::result< std::string > GetTaskEscrow(const std::string & taskId)
Fetches the task and returns the associated escrow path.
bool IsTaskLocked(const std::string & taskKey)
bool LockTask(const std::string & taskKey)
bool MoveExpiredTaskLock(const std::string & taskKey, SGProcessing::Task & task)
outcome::result< void > SendEscrow(std::string path, sgns::base::Buffer value)
void ResetAtomicTransaction()
virtual void MarkTaskBad(const std::string & taskKey) override
Mark a task key as bad to be skipped.

Additional inherited members

Public Functions inherited from ProcessingTaskQueue

Name
virtual ~ProcessingTaskQueue() =default
Distributed task queue interface.

Public Functions Documentation

function ProcessingTaskQueueImpl

ProcessingTaskQueueImpl(
    std::shared_ptr< sgns::crdt::GlobalDB > db,
    std::string processing_topic
)

Parameters:

  • db - CRDT GlobalDB to use.
  • processing_topic - Topic prefix used for task keys.

Create a task queue.

function ~ProcessingTaskQueueImpl

~ProcessingTaskQueueImpl()

function EnqueueTask

virtual outcome::result< void > EnqueueTask(
    const SGProcessing::Task & task,
    const std::list< SGProcessing::SubTask > & subTasks
) override

Parameters:

  • task - Task to add
  • subTasks - List of subtasks

Reimplements: ProcessingTaskQueue::EnqueueTask

Enqueue a task and subtasks

function GetSubTasks

virtual bool GetSubTasks(
    const std::string & taskId,
    std::list< SGProcessing::SubTask > & subTasks
) override

Parameters:

  • taskId - id to look for subtasks of
  • subTasks - Reference of subtask list

Reimplements: ProcessingTaskQueue::GetSubTasks

Get subtasks by task id, returns true if we got subtasks

function GrabTask

virtual outcome::result< std::pair< std::string, SGProcessing::Task > > GrabTask() override

Return: Pair of task key and task if available.

Reimplements: ProcessingTaskQueue::GrabTask

Get a task by key.

function CompleteTask

virtual outcome::result< std::shared_ptr< crdt::AtomicTransaction > > CompleteTask(
    const std::string & taskKey,
    const SGProcessing::TaskResult & taskResult
) override

Parameters:

  • taskKey - id to look for task
  • taskResult - Reference of a task result

Reimplements: ProcessingTaskQueue::CompleteTask

Complete task by task key, returns true if successful

function IsTaskCompleted

virtual bool IsTaskCompleted(
    const std::string & taskId
) override

Checks whether a task is completed.

Parameters:

  • taskId Task id.

Return: true if completed, false otherwise.

Reimplements: ProcessingTaskQueue::IsTaskCompleted

function IsTaskValid

outcome::result< void > IsTaskValid(
    const std::string taskJson
)

function IsSubTaskValid

outcome::result< void > IsSubTaskValid(
    const std::string taskJson
)

function GetTaskEscrow

outcome::result< std::string > GetTaskEscrow(
    const std::string & taskId
)

Fetches the task and returns the associated escrow path.

Parameters:

  • taskId The task ID

Return: If successful, it returns a escrow path string

function IsTaskLocked

bool IsTaskLocked(
    const std::string & taskKey
)

Parameters:

  • taskKey - id to look for task

Find if a task is locked

function LockTask

bool LockTask(
    const std::string & taskKey
)

Parameters:

  • taskKey - id to look for task

Lock a task by key

function MoveExpiredTaskLock

bool MoveExpiredTaskLock(
    const std::string & taskKey,
    SGProcessing::Task & task
)

Parameters:

  • taskKey - id to look for task
  • task - task reference

Move lock if expired, true if successful

function SendEscrow

outcome::result< void > SendEscrow(
    std::string path,
    sgns::base::Buffer value
)

function ResetAtomicTransaction

void ResetAtomicTransaction()

function MarkTaskBad

virtual void MarkTaskBad(
    const std::string & taskKey
) override

Mark a task key as bad to be skipped.

Parameters:

  • taskKey Task key to mark as bad.

Reimplements: ProcessingTaskQueue::MarkTaskBad


Updated on 2026-03-04 at 13:10:43 -0800