Skip to content

src/crdt/crdt_datastore.hpp

CRDT datastore class source file. More...

Namespaces

Name
sgns
sgns::blockchain
sgns::crdt

Classes

Name
class sgns::crdt::CrdtDatastore
Forward declaration of CRDT Set class.

Functions

Name
OUTCOME_HPP_DECLARE_ERROR_2(sgns::crdt , CrdtDatastore::Error )
Macro for declaring error handling in the CrdtDatastore class.

Detailed Description

CRDT datastore class source file.

Date: 2025-04-04 devcareer0 Henrique A. Klein ([email protected])

Functions Documentation

function OUTCOME_HPP_DECLARE_ERROR_2

OUTCOME_HPP_DECLARE_ERROR_2(
    sgns::crdt ,
    CrdtDatastore::Error 
)

Macro for declaring error handling in the CrdtDatastore class.

Source code

#ifndef SUPERGENIUS_CRDT_DATASTORE_HPP
#define SUPERGENIUS_CRDT_DATASTORE_HPP

#include <shared_mutex>
#include <future>
#include <chrono>
#include <queue>
#include <unordered_set>
#include <map>
#include <condition_variable>
#include <optional>

#include <boost/asio/steady_timer.hpp>
#include <ipfs_lite/ipld/ipld_node.hpp>
#include <primitives/cid/cid.hpp>

#include "base/logger.hpp"
#include "crdt/crdt_set.hpp"
#include "crdt/crdt_heads.hpp"
#include "crdt/broadcaster.hpp"
#include "crdt/dagsyncer.hpp"
#include "crdt/crdt_options.hpp"
#include "crdt/crdt_data_filter.hpp"
#include "crdt/crdt_callback_manager.hpp"
#include "storage/rocksdb/rocksdb.hpp"

namespace sgns
{
    class Blockchain;
}

namespace sgns::blockchain
{
    class ValidatorRegistry;
}

namespace sgns::crdt
{
    class CrdtSet; 

    class CrdtDatastore : public std::enable_shared_from_this<CrdtDatastore>
    {
    public:
        using Buffer      = base::Buffer;
        using Logger      = base::Logger;
        using RocksDB     = storage::rocksdb;
        using QueryResult = RocksDB::QueryResult;
        using Delta       = pb::Delta;
        using Element     = pb::Element;
        using IPLDNode    = ipfs_lite::ipld::IPLDNode;

        using CRDTElementFilterCallback  = CRDTDataFilter::ElementFilterCallback;
        using CRDTNewElementCallback     = CRDTCallbackManager::NewDataCallback;
        using CRDTDeletedElementCallback = CRDTCallbackManager::DeletedDataCallback;

        enum class JobStatus
        {
            PENDING,
            COMPLETED,
            FAILED
        };

        enum class Error
        {
            INVALID_PARAM = 0,
            FETCH_ROOT_NODE,
            NODE_DESERIALIZATION,
            FETCHING_GRAPH,
            NODE_CREATION,
            GET_NODE,
            INVALID_JOB,
        };

        static std::shared_ptr<CrdtDatastore> New( std::shared_ptr<RocksDB>     aDatastore,
                                                   const HierarchicalKey       &aKey,
                                                   std::shared_ptr<DAGSyncer>   aDagSyncer,
                                                   std::shared_ptr<Broadcaster> aBroadcaster,
                                                   std::shared_ptr<CrdtOptions> aOptions );

        void Start();
        virtual ~CrdtDatastore();

        static std::shared_ptr<Delta> DeltaMerge( const std::shared_ptr<Delta> &aDelta1,
                                                  const std::shared_ptr<Delta> &aDelta2 );

        outcome::result<Buffer> GetKey( const HierarchicalKey &aKey ) const;

        outcome::result<QueryResult> QueryKeyValues( std::string_view aPrefix ) const;

        outcome::result<QueryResult> QueryKeyValues( const std::string &prefix_base,
                                                     const std::string &middle_part,
                                                     const std::string &remainder_prefix ) const;

        std::string GetKeysPrefix() const;

        static std::string GetValueSuffix();

        outcome::result<CID> PutKey( const HierarchicalKey                 &aKey,
                                     const Buffer                          &aValue,
                                     const std::unordered_set<std::string> &topics );

        outcome::result<bool> HasKey( const HierarchicalKey &aKey ) const;

        outcome::result<CID> DeleteKey( const HierarchicalKey &aKey, const std::unordered_set<std::string> &topics );

        outcome::result<CID> Publish( const std::shared_ptr<Delta>          &aDelta,
                                      const std::unordered_set<std::string> &topics );

        outcome::result<void> PrintDAG();

        static outcome::result<std::vector<CID>> DecodeBroadcast( const Buffer &buff );

        static outcome::result<std::shared_ptr<Delta>> CreateDeltaToAdd( const std::string &key,
                                                                         const std::string &value );

        outcome::result<std::shared_ptr<Delta>> CreateDeltaToRemove( const std::string &key ) const;

        void PrintDataStore();

        void Close();

        bool RegisterElementFilter( const std::string &pattern, CRDTElementFilterCallback filter );
        bool RegisterNewElementCallback( const std::string &pattern, CRDTNewElementCallback callback );
        bool RegisterDeletedElementCallback( const std::string &pattern, CRDTDeletedElementCallback callback );

        void AddTopicName( const std::string &topic );

        outcome::result<CrdtHeads::CRDTListResult> GetHeadList();
        outcome::result<void>                      RemoveHead( const CID &aCid, const std::string &topic );
        outcome::result<uint64_t>                  GetHeadHeight( const CID &aCid, const std::string &topic );
        outcome::result<void>      AddHead( const CID &aCid, const std::string &topic, uint64_t priority );
        outcome::result<JobStatus> GetJobStatus( const CID &cid );

        outcome::result<void> BroadcastHeadsForTopics( const std::set<std::string> &topics );

        std::unordered_set<std::string> GetTopicNames() const;

    protected:
        friend class PubSubBroadcasterExt;
        friend class ::sgns::Blockchain;
        friend class ::sgns::blockchain::ValidatorRegistry;

        struct RootCIDJob
        {
            std::shared_ptr<IPLDNode> node_;            
            std::shared_ptr<IPLDNode> root_node_;       
            bool                      created_by_self_; 
        };

        struct DagWorker
        {
            std::future<void> dagWorkerFuture_;                /*> Future for DAG worker thread */
            std::atomic<bool> dagWorkerThreadRunning_ = false; /*> Flag used for keep track of thread cycle */
        };

        void HandleCIDBroadcast();
        outcome::result<void> HandleRootCIDBlock( const CID &aCid );
        outcome::result<RootCIDJob> CreateRootJob( const CID &aRootCID );
        outcome::result<std::set<CID>> GetLinksToFetch( const RootCIDJob &job );
        outcome::result<void> FetchNodes( const RootCIDJob &aRootJob, const std::set<CID> &aLinks );
        outcome::result<Delta> GetDeltaFromNode( const IPLDNode &aNode, bool created_by_self );
        outcome::result<void> MergeDataFromDelta( const CID &node_cid, const Delta &aDelta );
        outcome::result<void> ProcessJobIteration( const RootCIDJob &job_to_process );

        outcome::result<void> Sync( const HierarchicalKey &aKey );

        outcome::result<void> PrintDAGRec( const CID &aCID, uint64_t aDepth, std::vector<CID> &aSet );

        void RebroadcastHeads();

        outcome::result<void> Broadcast( const std::set<CID>                    &cids,
                                         const std::string                      &topic,
                                         boost::optional<libp2p::peer::PeerInfo> peerInfo = boost::none );

        outcome::result<Buffer> EncodeBroadcast( const std::set<CID> &heads );

        static outcome::result<Buffer> EncodeBroadcastStatic( const std::set<CID> &heads );

        outcome::result<std::shared_ptr<IPLDNode>> CreateIPLDNode(
            const std::vector<std::pair<CID, std::string>> &aHeads,
            const std::shared_ptr<Delta>                   &aDelta,
            const std::unordered_set<std::string>          &topics ) const;

        outcome::result<std::shared_ptr<IPLDNode>> CreateDAGNode( const std::shared_ptr<Delta>          &aDelta,
                                                                  const std::unordered_set<std::string> &topics );
        outcome::result<CID> AddDAGNode( const std::shared_ptr<CrdtDatastore::IPLDNode> &node );

        outcome::result<void> SyncDatastore( const std::vector<HierarchicalKey> &aKeyList );

        void PutElementsCallback( const std::string &key, const Buffer &value, const std::string &cid );
        void DeleteElementsCallback( const std::string &key, const std::string &cid );

        void UpdateCRDTHeads( const CID &rootCID, uint64_t rootPriority, bool add_topics_to_broadcast );
        bool EnqueueRootCID( const CID &cid );

        outcome::result<CID> WaitForJob( const CID &cid );

    private:
        CrdtDatastore() = default;

        CrdtDatastore( std::shared_ptr<RocksDB>     aDatastore,
                       const HierarchicalKey       &aKey,
                       std::shared_ptr<DAGSyncer>   aDagSyncer,
                       std::shared_ptr<Broadcaster> aBroadcaster,
                       std::shared_ptr<CrdtOptions> aOptions );

        bool ShouldContinueWorkerThread( DagWorker &dagWorker );
        bool ProcessJobs( std::queue<RootCIDJob> &jobs );
        bool SeedNextExternalRoot();
        bool IsRootCIDPendingOrActive( const CID &cid );
        bool IsRootCIDPendingOrActiveLocked( const CID &cid ) const;
        void HandleJobProcessingFailure( const RootCIDJob &job );
        void HandleJobProcessingSuccess( const RootCIDJob &job );
        void CleanupFailedJob( const RootCIDJob &job );

        std::shared_ptr<RocksDB>     dataStore_ = nullptr;
        std::shared_ptr<CrdtOptions> options_   = nullptr;

        HierarchicalKey namespaceKey_;

        std::shared_ptr<CrdtSet>   set_   = nullptr;
        std::shared_ptr<CrdtHeads> heads_ = nullptr;

        std::shared_ptr<Broadcaster> broadcaster_ = nullptr;
        std::shared_ptr<DAGSyncer>   dagSyncer_   = nullptr;
        Logger                       logger_      = base::createLogger( "CrdtDatastore" );

        static constexpr std::chrono::milliseconds threadSleepTimeInMilliseconds_ = std::chrono::milliseconds( 500 );
        static constexpr std::string_view          headsNamespace_                = "h";
        static constexpr std::string_view          setsNamespace_                 = "s";
        int                                        numberOfDagWorkers             = 1;

        std::future<void> handleNextFuture_;
        std::atomic<bool> handleNextThreadRunning_ = false;

        std::future<void> rebroadcastFuture_;
        std::atomic<bool> rebroadcastThreadRunning_ = false;

        std::vector<std::unique_ptr<DagWorker>> dagWorkers_;

        std::atomic<bool>       dagWorkerJobListThreadRunning_ = false;
        std::mutex              dagWorkerMutex_;
        std::condition_variable dagWorkerCv_;

        std::queue<RootCIDJob>                               rootCIDJobList_;     // External jobs
        std::queue<RootCIDJob>                               selfCreatedJobList_; // Self-created jobs (high priority)
        std::map<CID, std::set<std::pair<CID, std::string>>> pendingHeadsByRootCID_;
        std::mutex                                           pendingHeadsMutex_;
        std::queue<CID>                                      pendingRootQueue_;
        std::optional<CID>                                   activeRootCID_;

        CRDTDataFilter crdt_filter_;
        bool           started_ = false;

        std::mutex                      rebroadcastMutex_;
        std::mutex                      dagWorkerCvMutex_;
        std::condition_variable         rebroadcastCv_;
        std::unordered_set<std::string> topicNames_;
        mutable std::mutex              topicNamesMutex_;
        bool                            isFullNode = false;
        std::mutex                      pendingBroadcastMutex_;
        std::unordered_set<std::string> pendingBroadcastTopics_;

        CRDTCallbackManager crdt_cb_manager_;

        std::map<CID, JobStatus> pending_jobs_;
        bool                     has_full_node_topic_;

        void MarkJobPending( const CID &cid );
        void MarkJobFailed( const CID &cid );

        // Cache for CID string representations to avoid repeated base58 encoding
        mutable std::map<CID, std::string> cid_string_cache_;
        mutable std::mutex                 cid_string_cache_mutex_;
    };

}

OUTCOME_HPP_DECLARE_ERROR_2( sgns::crdt, CrdtDatastore::Error );

#endif //SUPERGENIUS_CRDT_DATASTORE_HPP

Updated on 2026-03-04 at 13:10:44 -0800