Skip to content

sgns::processing::ProcessingSubTaskQueue

Distributed subtask queue implementation.

#include <processing_subtask_queue.hpp>

Public Types

Name
using std::function< uint64_t()> TimestampProvider

Public Functions

Name
ProcessingSubTaskQueue(std::string localNodeId, TimestampProvider timestampProvider =nullptr)
void CreateQueue(SGProcessing::ProcessingQueue * queue, const std::vector< int > & enabledItemIndices)
bool GrabItem(size_t & grabbedItemIndex, uint64_t timestamp)
bool MoveOwnershipTo(const std::string & nodeId)
bool RollbackOwnership()
bool HasOwnership() const
bool UpdateQueue(SGProcessing::ProcessingQueue * queue, const std::vector< int > & enabledItemIndices)
bool UnlockExpiredItems(uint64_t currentTime)
uint64_t GetLastLockTimestamp() const
bool AddOwnershipRequest(const std::string & nodeId, uint64_t timestamp)
bool ProcessNextOwnershipRequest()

Public Types Documentation

using TimestampProvider

using sgns::processing::ProcessingSubTaskQueue::TimestampProvider = std::function<uint64_t()>;

Public Functions Documentation

function ProcessingSubTaskQueue

ProcessingSubTaskQueue(
    std::string localNodeId,
    TimestampProvider timestampProvider =nullptr
)

Parameters:

  • localNodeId local processing node ID
  • timestampProvider get the current timestamp from the manager function

Construct an empty queue

function CreateQueue

void CreateQueue(
    SGProcessing::ProcessingQueue * queue,
    const std::vector< int > & enabledItemIndices
)

Parameters:

  • queue - Queue snapshot to initialize.
  • enabledItemIndices - Indexes of enabled items; disabled items are treated as deleted.

Initialize a subtask queue snapshot.

function GrabItem

bool GrabItem(
    size_t & grabbedItemIndex,
    uint64_t timestamp
)

Parameters:

  • grabbedItemIndex - Index of the grabbed item if successful.
  • timestamp - Current timestamp for the queue.

Return: true if an item was grabbed, false otherwise.

Attempts to grab a subtask from the queue.

function MoveOwnershipTo

bool MoveOwnershipTo(
    const std::string & nodeId
)

Parameters:

  • nodeId - processing node ID that the ownership should be transferred

Transfer the queue ownership to another processing node

function RollbackOwnership

bool RollbackOwnership()

Return: true if the ownership is successfully rolled back

Rollbacks the queue ownership to the previous state

function HasOwnership

bool HasOwnership() const

Return: true is the lolca node owns the queue

Checks id the local processing node owns the queue

function UpdateQueue

bool UpdateQueue(
    SGProcessing::ProcessingQueue * queue,
    const std::vector< int > & enabledItemIndices
)

Parameters:

  • queue - the queue snapshot
  • enabledItemIndices - indexes of enabled items. Disabled items are considered as deleted.

Updates the local queue with a snapshot that have the most recent timestamp

function UnlockExpiredItems

bool UnlockExpiredItems(
    uint64_t currentTime
)

Parameters:

  • currentTime - the current queue time

Return: true if at least one item was unlocked

Unlocks expired queue items

function GetLastLockTimestamp

uint64_t GetLastLockTimestamp() const

Returns the most recent item lock timestamp

function AddOwnershipRequest

bool AddOwnershipRequest(
    const std::string & nodeId,
    uint64_t timestamp
)

Parameters:

  • nodeId - ID of the node requesting ownership
  • timestamp - timestamp when the request was created

Return: true if the request was successfully added, false if it already exists

Adds a new ownership request to the queue

function ProcessNextOwnershipRequest

bool ProcessNextOwnershipRequest()

Return: true if an ownership request was processed, false if the queue is empty or the node doesn't have ownership

Processes the next ownership request in the queue


Updated on 2026-03-04 at 13:10:43 -0800