src/processing/processing_subtask_queue_accessor_impl.hpp¶
Namespaces¶
| Name |
|---|
| sgns |
| sgns::processing |
Classes¶
| Name | |
|---|---|
| class | sgns::processing::SubTaskQueueAccessorImpl Subtask queue accessor implementation. |
Source code¶
#ifndef SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_ACCESSOR_IMPL_HPP
#define SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_ACCESSOR_IMPL_HPP
#include "processing/processing_subtask_queue_accessor.hpp"
#include "processing/processing_subtask_queue_manager.hpp"
#include "processing/processing_subtask_result_storage.hpp"
#include "processing/processing_validation_core.hpp"
#include <ipfs_pubsub/gossip_pubsub_topic.hpp>
#include <list>
#include <optional>
#include <thread>
#include <boost/asio.hpp>
namespace sgns::processing
{
class SubTaskQueueAccessorImpl : public SubTaskQueueAccessor,
public std::enable_shared_from_this<SubTaskQueueAccessorImpl>
{
public:
SubTaskQueueAccessorImpl( std::shared_ptr<ipfs_pubsub::GossipPubSub> gossipPubSub,
std::shared_ptr<ProcessingSubTaskQueueManager> subTaskQueueManager,
std::shared_ptr<SubTaskResultStorage> subTaskResultStorage,
std::function<void( const SGProcessing::TaskResult & )> taskResultProcessingSink,
std::function<void( const std::string & )> processingErrorSink );
~SubTaskQueueAccessorImpl() override;
bool ConnectToSubTaskQueue( std::function<void()> onSubTaskQueueConnectedEventSink ) override;
bool AssignSubTasks( std::list<SGProcessing::SubTask> &subTasks ) override;
void GrabSubTask( SubTaskGrabbedCallback onSubTaskGrabbedCallback ) override;
void CompleteSubTask( const std::string &subTaskId, const SGProcessing::SubTaskResult &subTaskResult ) override;
bool CreateResultsChannel( const std::string &task_id ) override;
std::vector<std::tuple<std::string, SGProcessing::SubTaskResult>> GetResults() const;
enum class FinalizationRetVal
{
NOT_FINALIZED = 0,
FINALIZED = 1,
FINALIZED_BUT_NOT_OWNER = 2,
};
private:
bool OnResultReceived( SGProcessing::SubTaskResult &&subTaskResult );
void OnSubTaskQueueAssigned( const std::vector<std::string> &subTaskIds,
std::function<void()> onSubTaskQueueConnectedEventSink );
void UpdateResultsFromStorage( const std::set<std::string> &subTaskIds );
FinalizationRetVal FinalizeQueueProcessing( const SGProcessing::SubTaskCollection &subTasks,
std::set<std::string> &invalidSubTaskIds );
static void OnResultChannelMessage( std::weak_ptr<SubTaskQueueAccessorImpl> weakThis,
boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message );
void StartPeriodicStateBroadcast();
void ScheduleStateBroadcast();
void PublishExistingResults();
// Helper method to find a subtask by ID
boost::optional<SGProcessing::SubTask> FindSubTaskById( const std::string &subTaskId ) const;
std::shared_ptr<ipfs_pubsub::GossipPubSub> m_gossipPubSub;
std::shared_ptr<ProcessingSubTaskQueueManager> m_subTaskQueueManager;
std::shared_ptr<SubTaskResultStorage> m_subTaskResultStorage;
std::function<void( const SGProcessing::TaskResult & )> m_taskResultProcessingSink;
std::function<void( const std::string & )> m_processingErrorSink;
std::shared_ptr<boost::asio::io_context> m_localContext;
using WorkGuard = boost::asio::executor_work_guard<boost::asio::io_context::executor_type>;
std::optional<WorkGuard> m_localWorkGuard;
std::thread m_localThread;
std::shared_ptr<boost::asio::steady_timer> m_stateTimer;
std::shared_ptr<sgns::ipfs_pubsub::GossipPubSubTopic> m_resultChannel;
mutable std::mutex m_mutexResults;
std::map<std::string, SGProcessing::SubTaskResult> m_results;
ProcessingValidationCore m_validationCore;
base::Logger m_logger = base::createLogger( "ProcessingSubTaskQueueAccessorImpl" );
};
}
#endif
Updated on 2026-03-04 at 13:10:44 -0800