sgns::processing::ProcessingSubTaskQueueManager¶
Distributed subtask queue manager.
#include <processing_subtask_queue_manager.hpp>
Inherits from std::enable_shared_from_this< ProcessingSubTaskQueueManager >
Public Types¶
| Name | |
|---|---|
| using std::function< void(boost::optional< const SGProcessing::SubTask & >)> | SubTaskGrabbedCallback |
Public Functions¶
| Name | |
|---|---|
| ProcessingSubTaskQueueManager(std::shared_ptr< ProcessingSubTaskQueueChannel > queueChannel, std::shared_ptr< boost::asio::io_context > context, const std::string & localNodeId, std::function< void(const std::string &)> processingErrorSink, uint64_t delayBetweenProcessingMs =20) | |
| ~ProcessingSubTaskQueueManager() | |
| void | SetProcessingTimeout(const std::chrono::system_clock::duration & processingTimeout) |
| bool | CreateQueue(std::list< SGProcessing::SubTask > & subTasks) |
| void | GrabSubTask(SubTaskGrabbedCallback onSubTaskGrabbedCallback) |
| bool | MoveOwnershipTo(const std::string & nodeId) |
| bool | HasOwnership() const |
| bool | ProcessSubTaskQueueMessage(SGProcessing::SubTaskQueue * queue) |
| bool | ProcessSubTaskQueueRequestMessage(const SGProcessing::SubTaskQueueRequest & request) |
| std::unique_ptr< SGProcessing::SubTaskQueue > | GetQueueSnapshot() const |
| void | ChangeSubTaskProcessingStates(const std::set< std::string > & subTaskIds, bool isProcessed) |
| bool | IsQueueInit() |
| bool | IsProcessed() const |
| void | SetSubTaskQueueAssignmentEventSink(std::function< void(const std::vector< std::string > &)> subTaskQueueAssignmentEventSink) |
| uint64_t | GetCurrentQueueTimestamp() |
| void | SetMaxSubtasksPerOwnership(size_t maxSubtasksPerOwnership) |
Public Types Documentation¶
using SubTaskGrabbedCallback¶
using sgns::processing::ProcessingSubTaskQueueManager::SubTaskGrabbedCallback = std::function<void( boost::optional<const SGProcessing::SubTask &> )>;
Public Functions Documentation¶
function ProcessingSubTaskQueueManager¶
ProcessingSubTaskQueueManager(
std::shared_ptr< ProcessingSubTaskQueueChannel > queueChannel,
std::shared_ptr< boost::asio::io_context > context,
const std::string & localNodeId,
std::function< void(const std::string &)> processingErrorSink,
uint64_t delayBetweenProcessingMs =20
)
Parameters:
- queueChannel - Task processing channel.
- context - IO context to handle timers.
- localNodeId - Local processing node ID.
- processingErrorSink - Callback for processing errors.
- delayBetweenProcessingMs - Delay between processing cycles (ms).
Construct an empty queue manager.
function ~ProcessingSubTaskQueueManager¶
function SetProcessingTimeout¶
Parameters:
- processingTimeout - subtask processing timeout Once the timeout is exceeded the subtask is marked as expired.
Set a timeout for subtask processing
function CreateQueue¶
Parameters:
- subTasks - a list of subtasks that should be added to the queue in subtasks to allow a validation
Return: false if not queue was created due to errors
Create a subtask queue by splitting the task to subtasks using the processing code
function GrabSubTask¶
Parameters:
- onSubTaskGrabbedCallback a callback that is called when a grabbed iosubtask is locked by the local node
Asynchronous getting of a subtask from the queue
function MoveOwnershipTo¶
Parameters:
- nodeId - processing node ID that the ownership should be transferred
Transfer the queue ownership to another processing node
function HasOwnership¶
Return: true is the local node owns the queue
Checks id the local processing node owns the queue
function ProcessSubTaskQueueMessage¶
Parameters:
- queue received queue snapshot
Changes the local queue state with respect to passed queue snapshot The method should be called from a processing channel message handler
function ProcessSubTaskQueueRequestMessage¶
Parameters:
- request is a request for the queue ownership transferring
Changes the local queue state with respect to passed queue request The method should be called from a processing channel message handler
function GetQueueSnapshot¶
Return: the queue snapshot
Returns the current local queue snapshot
function ChangeSubTaskProcessingStates¶
Parameters:
- subTaskIds - a list of subtask which state should be changed
- isProcessed - new state
Mark a subtask as processed/unprocessed
function IsQueueInit¶
Check whether queue has been initialized to prevent nullptr access to the queue.
function IsProcessed¶
Return: true if the queue is processed
Checks if all subtask in the queue are processed
function SetSubTaskQueueAssignmentEventSink¶
void SetSubTaskQueueAssignmentEventSink(
std::function< void(const std::vector< std::string > &)> subTaskQueueAssignmentEventSink
)
Parameters:
- subTaskQueueAssignmentEventSink * lambda or function handling the event
Sink that gets subtask assignment events
function GetCurrentQueueTimestamp¶
Return: The current queue timestamp as a 64-bit unsigned integer.
Retrieves the current timestamp associated with the queue.
function SetMaxSubtasksPerOwnership¶
Parameters:
- maxSubtasksPerOwnership The maximum number of subtasks that can be assigned to a single ownership instance.
Sets the maximum number of subtasks that can be owned at one time.
Updated on 2026-03-04 at 13:10:43 -0800