Skip to content

src/processing/processing_subtask_queue.hpp

Namespaces

Name
sgns
sgns::processing

Classes

Name
class sgns::processing::ProcessingSubTaskQueue
Distributed subtask queue implementation.

Source code

#ifndef SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_HPP
#define SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_HPP

#include "processing/proto/SGProcessing.pb.h"
#include "base/logger.hpp"

namespace sgns::processing
{
class ProcessingSubTaskQueue
{
public:

    using TimestampProvider = std::function<uint64_t()>;

    ProcessingSubTaskQueue(std::string localNodeId, TimestampProvider timestampProvider = nullptr);

    void CreateQueue(SGProcessing::ProcessingQueue* queue, const std::vector<int>& enabledItemIndices);

    bool GrabItem(size_t& grabbedItemIndex, uint64_t timestamp);

    bool MoveOwnershipTo(const std::string& nodeId);

    bool RollbackOwnership();

    bool HasOwnership() const;

    bool UpdateQueue(SGProcessing::ProcessingQueue* queue, const std::vector<int>& enabledItemIndices);

    bool UnlockExpiredItems(uint64_t currentTime);

    [[nodiscard]] uint64_t GetLastLockTimestamp() const;

    bool AddOwnershipRequest(const std::string& nodeId, uint64_t timestamp);

    bool ProcessNextOwnershipRequest();

private:

    void ChangeOwnershipTo(const std::string& nodeId);

    bool LockItem(size_t& lockedItemIndex, uint64_t timestamp);

    void LogQueue() const;

    std::string m_localNodeId;
    SGProcessing::ProcessingQueue* m_queue;

    std::vector<int> m_enabledItemIndices;

    base::Logger m_logger = base::createLogger("ProcessingSubTaskQueue");

    TimestampProvider m_timestampProvider;
};
}

#endif // SUPERGENIUS_PROCESSING_SUBTASK_QUEUE_HPP

Updated on 2026-03-04 at 13:10:44 -0800