src/processing/processing_subtask_queue_manager.hpp¶
Namespaces¶
| Name |
|---|
| sgns |
| sgns::processing |
Classes¶
| Name | |
|---|---|
| class | sgns::processing::ProcessingSubTaskQueueManager Distributed subtask queue manager. |
Source code¶
#ifndef SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_MANAGER_HPP
#define SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_MANAGER_HPP
#include "processing/processing_subtask_queue.hpp"
#include "processing/processing_subtask_queue_channel.hpp"
#include "processing/proto/SGProcessing.pb.h"
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <list>
#include <queue>
namespace sgns::processing
{
class ProcessingSubTaskQueueManager : public std::enable_shared_from_this<ProcessingSubTaskQueueManager>
{
public:
using SubTaskGrabbedCallback = std::function<void( boost::optional<const SGProcessing::SubTask &> )>;
ProcessingSubTaskQueueManager( std::shared_ptr<ProcessingSubTaskQueueChannel> queueChannel,
std::shared_ptr<boost::asio::io_context> context,
const std::string &localNodeId,
std::function<void( const std::string & )> processingErrorSink,
uint64_t delayBetweenProcessingMs = 20 );
~ProcessingSubTaskQueueManager();
void SetProcessingTimeout( const std::chrono::system_clock::duration &processingTimeout );
bool CreateQueue( std::list<SGProcessing::SubTask> &subTasks );
void GrabSubTask( SubTaskGrabbedCallback onSubTaskGrabbedCallback );
bool MoveOwnershipTo( const std::string &nodeId );
bool HasOwnership() const;
bool ProcessSubTaskQueueMessage( SGProcessing::SubTaskQueue *queue );
bool ProcessSubTaskQueueRequestMessage( const SGProcessing::SubTaskQueueRequest &request );
std::unique_ptr<SGProcessing::SubTaskQueue> GetQueueSnapshot() const;
void ChangeSubTaskProcessingStates( const std::set<std::string> &subTaskIds, bool isProcessed );
bool IsQueueInit()
{
return m_queue != nullptr;
}
bool IsProcessed() const;
void SetSubTaskQueueAssignmentEventSink(
std::function<void( const std::vector<std::string> & )> subTaskQueueAssignmentEventSink );
uint64_t GetCurrentQueueTimestamp();
void SetMaxSubtasksPerOwnership( size_t maxSubtasksPerOwnership )
{
m_defaultMaxSubtasksPerOwnership = maxSubtasksPerOwnership;
}
private:
bool UpdateQueue( SGProcessing::SubTaskQueue *queue );
void HandleQueueRequestTimeout( const boost::system::error_code &ec );
void PublishSubTaskQueue();
void ProcessPendingSubTaskGrabbing();
void GrabSubTasks();
void HandleGrabSubTaskTimeout( const boost::system::error_code &ec );
void LogQueue() const;
bool HasAvailableWork( bool checkOwnershipQuota = true ) const;
void UpdateQueueTimestamp();
void CheckActiveCount();
uint64_t CalculateGrabSubTaskTimeout() const;
std::vector<int> UpdateUnprocessedSubTaskIndices( const SGProcessing::SubTaskQueue *queue ) const;
std::shared_ptr<ProcessingSubTaskQueueChannel> m_queueChannel;
std::shared_ptr<boost::asio::io_context> m_context;
std::string m_localNodeId;
std::shared_ptr<SGProcessing::SubTaskQueue> m_queue;
mutable std::recursive_mutex m_queueMutex;
std::list<SubTaskGrabbedCallback> m_onSubTaskGrabbedCallbacks;
std::function<void( const std::vector<std::string> & )> m_subTaskQueueAssignmentEventSink;
std::set<std::string> m_processedSubTaskIds;
boost::asio::deadline_timer m_dltQueueResponseTimeout;
boost::posix_time::time_duration m_queueResponseTimeout;
boost::asio::deadline_timer m_dltGrabSubTaskTimeout;
ProcessingSubTaskQueue m_processingQueue;
std::chrono::system_clock::duration m_processingTimeout;
std::function<void( const std::string & )> m_processingErrorSink;
uint64_t m_queue_timestamp_ = 0; // Aggregate time counter for the queue
uint64_t m_ownership_acquired_at_ = 0; // When this node acquired ownership (in ms)
uint64_t m_ownership_last_delta_time_ = 0; // When this node last updated the queue timestamp
// Add to private section of ProcessingSubTaskQueueManager
struct OwnershipRequest
{
std::string node_id;
uint64_t timestamp; // Timestamp when request was received
};
std::queue<OwnershipRequest> m_ownershipRequestQueue;
base::Logger m_logger = base::createLogger( "ProcessingSubTaskQueueManager" );
std::chrono::steady_clock::time_point m_lastQueueUpdateTime = std::chrono::steady_clock::now();
size_t m_processedSubtasksInCurrentOwnership = 0;
size_t m_defaultMaxSubtasksPerOwnership = 1;
size_t m_maxSubtasksPerOwnership;
const uint64_t m_delayBetweenProcessingMs;
std::chrono::steady_clock::time_point m_lastActiveCountCheck = std::chrono::steady_clock::now();
uint64_t m_waitTimeBeforeReset = 3000; // Initial wait time of 3000ms
bool m_initialDelayPassed = false; // Track if initial delay has passed
};
}
#endif // SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_MANAGER_HPP
Updated on 2026-03-04 at 13:10:44 -0800