sgns::processing::ProcessingSubTaskQueue¶
Distributed subtask queue implementation.
#include <processing_subtask_queue.hpp>
Public Types¶
| Name | |
|---|---|
| using std::function< uint64_t()> | TimestampProvider |
Public Functions¶
| Name | |
|---|---|
| ProcessingSubTaskQueue(std::string localNodeId, TimestampProvider timestampProvider =nullptr) | |
| void | CreateQueue(SGProcessing::ProcessingQueue * queue, const std::vector< int > & enabledItemIndices) |
| bool | GrabItem(size_t & grabbedItemIndex, uint64_t timestamp) |
| bool | MoveOwnershipTo(const std::string & nodeId) |
| bool | RollbackOwnership() |
| bool | HasOwnership() const |
| bool | UpdateQueue(SGProcessing::ProcessingQueue * queue, const std::vector< int > & enabledItemIndices) |
| bool | UnlockExpiredItems(uint64_t currentTime) |
| uint64_t | GetLastLockTimestamp() const |
| bool | AddOwnershipRequest(const std::string & nodeId, uint64_t timestamp) |
| bool | ProcessNextOwnershipRequest() |
Public Types Documentation¶
using TimestampProvider¶
Public Functions Documentation¶
function ProcessingSubTaskQueue¶
Parameters:
- localNodeId local processing node ID
- timestampProvider get the current timestamp from the manager function
Construct an empty queue
function CreateQueue¶
void CreateQueue(
SGProcessing::ProcessingQueue * queue,
const std::vector< int > & enabledItemIndices
)
Parameters:
- queue - Queue snapshot to initialize.
- enabledItemIndices - Indexes of enabled items; disabled items are treated as deleted.
Initialize a subtask queue snapshot.
function GrabItem¶
Parameters:
- grabbedItemIndex - Index of the grabbed item if successful.
- timestamp - Current timestamp for the queue.
Return: true if an item was grabbed, false otherwise.
Attempts to grab a subtask from the queue.
function MoveOwnershipTo¶
Parameters:
- nodeId - processing node ID that the ownership should be transferred
Transfer the queue ownership to another processing node
function RollbackOwnership¶
Return: true if the ownership is successfully rolled back
Rollbacks the queue ownership to the previous state
function HasOwnership¶
Return: true is the lolca node owns the queue
Checks id the local processing node owns the queue
function UpdateQueue¶
bool UpdateQueue(
SGProcessing::ProcessingQueue * queue,
const std::vector< int > & enabledItemIndices
)
Parameters:
- queue - the queue snapshot
- enabledItemIndices - indexes of enabled items. Disabled items are considered as deleted.
Updates the local queue with a snapshot that have the most recent timestamp
function UnlockExpiredItems¶
Parameters:
- currentTime - the current queue time
Return: true if at least one item was unlocked
Unlocks expired queue items
function GetLastLockTimestamp¶
Returns the most recent item lock timestamp
function AddOwnershipRequest¶
Parameters:
- nodeId - ID of the node requesting ownership
- timestamp - timestamp when the request was created
Return: true if the request was successfully added, false if it already exists
Adds a new ownership request to the queue
function ProcessNextOwnershipRequest¶
Return: true if an ownership request was processed, false if the queue is empty or the node doesn't have ownership
Processes the next ownership request in the queue
Updated on 2026-03-04 at 13:10:43 -0800