Skip to content

sgns::crdt::CrdtDatastore

Forward declaration of CRDT Set class. More...

#include <crdt_datastore.hpp>

Inherits from std::enable_shared_from_this< CrdtDatastore >

Protected Classes

Name
struct RootCIDJob
struct DagWorker

Public Types

Name
enum class JobStatus
enum class Error
using base::Buffer Buffer
using base::Logger Logger
using storage::rocksdb RocksDB
using RocksDB::QueryResult QueryResult
using pb::Delta Delta
using pb::Element Element
using ipfs_lite::ipld::IPLDNode IPLDNode
using CRDTDataFilter::ElementFilterCallback CRDTElementFilterCallback
using CRDTCallbackManager::NewDataCallback CRDTNewElementCallback
using CRDTCallbackManager::DeletedDataCallback CRDTDeletedElementCallback

Public Functions

Name
std::shared_ptr< CrdtDatastore > New(std::shared_ptr< RocksDB > aDatastore, const HierarchicalKey & aKey, std::shared_ptr< DAGSyncer > aDagSyncer, std::shared_ptr< Broadcaster > aBroadcaster, std::shared_ptr< CrdtOptions > aOptions)
Factory method to create a shared_ptr to a CrdtDatastore.
std::shared_ptr< Delta > DeltaMerge(const std::shared_ptr< Delta > & aDelta1, const std::shared_ptr< Delta > & aDelta2)
std::string GetValueSuffix()
outcome::result< std::vector< CID > > DecodeBroadcast(const Buffer & buff)
outcome::result< std::shared_ptr< Delta > > CreateDeltaToAdd(const std::string & key, const std::string & value)
void Start()
Starts the datastore threads.
virtual ~CrdtDatastore()
Destructor of the CRDT datastore.
outcome::result< Buffer > GetKey(const HierarchicalKey & aKey) const
outcome::result< QueryResult > QueryKeyValues(std::string_view aPrefix) const
outcome::result< QueryResult > QueryKeyValues(const std::string & prefix_base, const std::string & middle_part, const std::string & remainder_prefix) const
Queries with a middle part that can be a wildcard, negated string or normal string.
std::string GetKeysPrefix() const
outcome::result< CID > PutKey(const HierarchicalKey & aKey, const Buffer & aValue, const std::unordered_set< std::string > & topics)
Stores the given value in the CRDT store.
outcome::result< bool > HasKey(const HierarchicalKey & aKey) const
outcome::result< CID > DeleteKey(const HierarchicalKey & aKey, const std::unordered_set< std::string > & topics)
outcome::result< CID > Publish(const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics)
Publishes a Delta. Creates a DAG node from the given Delta, merges it into the CRDT, and broadcasts the node.
outcome::result< void > PrintDAG()
outcome::result< std::shared_ptr< Delta > > CreateDeltaToRemove(const std::string & key) const
void PrintDataStore()
void Close()
bool RegisterElementFilter(const std::string & pattern, CRDTElementFilterCallback filter)
bool RegisterNewElementCallback(const std::string & pattern, CRDTNewElementCallback callback)
bool RegisterDeletedElementCallback(const std::string & pattern, CRDTDeletedElementCallback callback)
void AddTopicName(const std::string & topic)
Configure which topic this datastore should filter on.
outcome::result< CrdtHeads::CRDTListResult > GetHeadList()
outcome::result< void > RemoveHead(const CID & aCid, const std::string & topic)
outcome::result< uint64_t > GetHeadHeight(const CID & aCid, const std::string & topic)
outcome::result< void > AddHead(const CID & aCid, const std::string & topic, uint64_t priority)
outcome::result< JobStatus > GetJobStatus(const CID & cid)
outcome::result< void > BroadcastHeadsForTopics(const std::set< std::string > & topics)
Broadcast heads for the specified topics.
std::unordered_set< std::string > GetTopicNames() const

Protected Functions

Name
void HandleCIDBroadcast()
Handles when a CID broadcast gets received If the CID is not known triggers HandleRootCIDBlock.
outcome::result< void > HandleRootCIDBlock(const CID & aCid)
Handles a root CID block by creating a job to fetch and process its content.
outcome::result< RootCIDJob > CreateRootJob(const CID & aRootCID)
Creates a RootCIDJob for the given root CID.
outcome::result< std::set< CID > > GetLinksToFetch(const RootCIDJob & job)
Gets the links to fetch for a given node in a job.
outcome::result< void > FetchNodes(const RootCIDJob & aRootJob, const std::set< CID > & aLinks)
Fetches the nodes for the given links and root job.
outcome::result< Delta > GetDeltaFromNode(const IPLDNode & aNode, bool created_by_self)
Gets the Delta from a given IPLD node, filtering it if it wasn't created by self.
outcome::result< void > MergeDataFromDelta(const CID & node_cid, const Delta & aDelta)
Merges the data from a given Delta into the CRDT set.
outcome::result< void > ProcessJobIteration(const RootCIDJob & job_to_process)
Processes A Root CID job.
outcome::result< void > Sync(const HierarchicalKey & aKey)
outcome::result< void > PrintDAGRec(const CID & aCID, uint64_t aDepth, std::vector< CID > & aSet)
void RebroadcastHeads()
outcome::result< void > Broadcast(const std::set< CID > & cids, const std::string & topic, boost::optional< libp2p::peer::PeerInfo > peerInfo =boost::none)
Broadcasts a set of CIDs. Encodes and broadcasts the provided list of CIDs.
outcome::result< Buffer > EncodeBroadcast(const std::set< CID > & heads)
outcome::result< std::shared_ptr< IPLDNode > > CreateIPLDNode(const std::vector< std::pair< CID, std::string > > & aHeads, const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics) const
outcome::result< std::shared_ptr< IPLDNode > > CreateDAGNode(const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics)
outcome::result< CID > AddDAGNode(const std::shared_ptr< CrdtDatastore::IPLDNode > & node)
outcome::result< void > SyncDatastore(const std::vector< HierarchicalKey > & aKeyList)
void PutElementsCallback(const std::string & key, const Buffer & value, const std::string & cid)
void DeleteElementsCallback(const std::string & key, const std::string & cid)
void UpdateCRDTHeads(const CID & rootCID, uint64_t rootPriority, bool add_topics_to_broadcast)
bool EnqueueRootCID(const CID & cid)
outcome::result< CID > WaitForJob(const CID & cid)
outcome::result< Buffer > EncodeBroadcastStatic(const std::set< CID > & heads)

Friends

Name
class PubSubBroadcasterExt

Detailed Description

class sgns::crdt::CrdtDatastore;

Forward declaration of CRDT Set class.

CRDT datastore class based on https://github.com/ipfs/go-ds-crdt

Public Types Documentation

enum JobStatus

Enumerator Value Description
PENDING
COMPLETED
FAILED

enum Error

Enumerator Value Description
INVALID_PARAM 0
FETCH_ROOT_NODE
NODE_DESERIALIZATION
FETCHING_GRAPH
NODE_CREATION
GET_NODE
INVALID_JOB

using Buffer

using sgns::crdt::CrdtDatastore::Buffer = base::Buffer;

using Logger

using sgns::crdt::CrdtDatastore::Logger = base::Logger;

using RocksDB

using sgns::crdt::CrdtDatastore::RocksDB = storage::rocksdb;

using QueryResult

using sgns::crdt::CrdtDatastore::QueryResult = RocksDB::QueryResult;

using Delta

using sgns::crdt::CrdtDatastore::Delta = pb::Delta;

using Element

using sgns::crdt::CrdtDatastore::Element = pb::Element;

using IPLDNode

using sgns::crdt::CrdtDatastore::IPLDNode = ipfs_lite::ipld::IPLDNode;

using CRDTElementFilterCallback

using sgns::crdt::CrdtDatastore::CRDTElementFilterCallback = CRDTDataFilter::ElementFilterCallback;

using CRDTNewElementCallback

using sgns::crdt::CrdtDatastore::CRDTNewElementCallback = CRDTCallbackManager::NewDataCallback;

using CRDTDeletedElementCallback

using sgns::crdt::CrdtDatastore::CRDTDeletedElementCallback = CRDTCallbackManager::DeletedDataCallback;

Public Functions Documentation

function New

static std::shared_ptr< CrdtDatastore > New(
    std::shared_ptr< RocksDB > aDatastore,
    const HierarchicalKey & aKey,
    std::shared_ptr< DAGSyncer > aDagSyncer,
    std::shared_ptr< Broadcaster > aBroadcaster,
    std::shared_ptr< CrdtOptions > aOptions
)

Factory method to create a shared_ptr to a CrdtDatastore.

Parameters:

  • aDatastore The underlying database where CRDT is stored
  • aKey The namespace key on the database where CRDT's variables will be stored
  • aDagSyncer The MerkleDAG syncer to request content of CIDs
  • aBroadcaster The broadcaster to publish CIDs
  • aOptions Options to construct the object

Return: A new instance of CrdtDatastore

function DeltaMerge

static std::shared_ptr< Delta > DeltaMerge(
    const std::shared_ptr< Delta > & aDelta1,
    const std::shared_ptr< Delta > & aDelta2
)

Parameters:

Return: pointer to merged delta

Static function to merge delta elements and tombstones, use highest priority for the result delta

function GetValueSuffix

static std::string GetValueSuffix()

Return: value suffix

Get value suffix used in set, e.g. /v

function DecodeBroadcast

static outcome::result< std::vector< CID > > DecodeBroadcast(
    const Buffer & buff
)

Parameters:

Return: vector of CIDs or outcome::failure on error

DecodeBroadcast decodes CRDT broadcast data

function CreateDeltaToAdd

static outcome::result< std::shared_ptr< Delta > > CreateDeltaToAdd(
    const std::string & key,
    const std::string & value
)

Parameters:

  • key - delta key to add to datastore
  • value - delta value to add to datastore

Return: pointer to new delta or outcome::failure on error

Returns a new delta-set adding the given key/value.

function Start

void Start()

Starts the datastore threads.

function ~CrdtDatastore

virtual ~CrdtDatastore()

Destructor of the CRDT datastore.

function GetKey

outcome::result< Buffer > GetKey(
    const HierarchicalKey & aKey
) const

Parameters:

  • aKey Hierarchical key to get

Return: value as a Buffer

Get the value of an element not tombstoned from the CRDT set by key

function QueryKeyValues

outcome::result< QueryResult > QueryKeyValues(
    std::string_view aPrefix
) const

Parameters:

  • aPrefix prefix to search, if empty string, return all

Return: list of key-value pairs matches prefix

Query CRDT set key-value pairs by prefix, if prefix empty return all elements are not tombstoned

function QueryKeyValues

outcome::result< QueryResult > QueryKeyValues(
    const std::string & prefix_base,
    const std::string & middle_part,
    const std::string & remainder_prefix
) const

Queries with a middle part that can be a wildcard, negated string or normal string.

Parameters:

  • prefix_base The base prefix to query
  • middle_part Either a string (normal query), '*' or !string
  • remainder_prefix The remainder part of the query prefix

Return: A list of key value pairs

function GetKeysPrefix

std::string GetKeysPrefix() const

Return: key prefix

Get key prefix used in set, e.g. /namespace/s/k/

function PutKey

outcome::result< CID > PutKey(
    const HierarchicalKey & aKey,
    const Buffer & aValue,
    const std::unordered_set< std::string > & topics
)

Stores the given value in the CRDT store.

Parameters:

  • aKey Hierarchical key to put
  • aValue Value to be stored
  • topics Topics to publish to

Return: outcome::success if stored and broadcasted successfully, or outcome::failure otherwise.

function HasKey

outcome::result< bool > HasKey(
    const HierarchicalKey & aKey
) const

Parameters:

Return: true if key found or false if not found or outcome::failure on error

HasKey returns whether the key is mapped to a value in set

function DeleteKey

outcome::result< CID > DeleteKey(
    const HierarchicalKey & aKey,
    const std::unordered_set< std::string > & topics
)

Parameters:

Return: outcome::failure on error or success otherwise

Delete removes the value for given key.

function Publish

outcome::result< CID > Publish(
    const std::shared_ptr< Delta > & aDelta,
    const std::unordered_set< std::string > & topics
)

Publishes a Delta. Creates a DAG node from the given Delta, merges it into the CRDT, and broadcasts the node.

Parameters:

  • aDelta Delta to publish
  • topics Topics to publish to

Return: returns outcome::success on success or outcome::failure otherwise

function PrintDAG

outcome::result< void > PrintDAG()

Return: returns outcome::success on success or outcome::failure otherwise

PrintDAG pretty prints the current Merkle-DAG using the given printFunc

function CreateDeltaToRemove

outcome::result< std::shared_ptr< Delta > > CreateDeltaToRemove(
    const std::string & key
) const

Parameters:

  • key - delta key to remove from datastore

Return: pointer to delta or outcome::failure on error

Returns a new delta-set removing the given keys with prefix /namespace/s/key

function PrintDataStore

void PrintDataStore()

function Close

void Close()

Close shuts down the CRDT datastore and worker threads. It should not be used afterwards.

function RegisterElementFilter

bool RegisterElementFilter(
    const std::string & pattern,
    CRDTElementFilterCallback filter
)

function RegisterNewElementCallback

bool RegisterNewElementCallback(
    const std::string & pattern,
    CRDTNewElementCallback callback
)

function RegisterDeletedElementCallback

bool RegisterDeletedElementCallback(
    const std::string & pattern,
    CRDTDeletedElementCallback callback
)

function AddTopicName

void AddTopicName(
    const std::string & topic
)

Configure which topic this datastore should filter on.

Parameters:

  • topic The topic name to use when filtering links. Only links whose IPLDLinkImpl::getName() equals this string will be processed.

When processing or rebroadcasting Merkle-DAG links, only those whose name exactly matches the topic set via this call will be considered.

function GetHeadList

outcome::result< CrdtHeads::CRDTListResult > GetHeadList()

function RemoveHead

outcome::result< void > RemoveHead(
    const CID & aCid,
    const std::string & topic
)

function GetHeadHeight

outcome::result< uint64_t > GetHeadHeight(
    const CID & aCid,
    const std::string & topic
)

function AddHead

outcome::result< void > AddHead(
    const CID & aCid,
    const std::string & topic,
    uint64_t priority
)

function GetJobStatus

outcome::result< JobStatus > GetJobStatus(
    const CID & cid
)

function BroadcastHeadsForTopics

outcome::result< void > BroadcastHeadsForTopics(
    const std::set< std::string > & topics
)

Broadcast heads for the specified topics.

Parameters:

  • topics Vector of topic names to broadcast heads for

Return: outcome::success on success, or outcome::failure on error

function GetTopicNames

std::unordered_set< std::string > GetTopicNames() const

Protected Functions Documentation

function HandleCIDBroadcast

void HandleCIDBroadcast()

Handles when a CID broadcast gets received If the CID is not known triggers HandleRootCIDBlock.

function HandleRootCIDBlock

outcome::result< void > HandleRootCIDBlock(
    const CID & aCid
)

Handles a root CID block by creating a job to fetch and process its content.

Parameters:

  • aCid The root CID to be handled

Return: Success if the Root Job was created, or failure otherwise

function CreateRootJob

outcome::result< RootCIDJob > CreateRootJob(
    const CID & aRootCID
)

Creates a RootCIDJob for the given root CID.

Parameters:

  • aRootCID The root CID to create the job for

Return: Success if Root Job created, or failure otherwise

function GetLinksToFetch

outcome::result< std::set< CID > > GetLinksToFetch(
    const RootCIDJob & job
)

Gets the links to fetch for a given node in a job.

Parameters:

  • job The root job of the current links to fetch

Return: List of CIDs to fetch, or failure otherwise

function FetchNodes

outcome::result< void > FetchNodes(
    const RootCIDJob & aRootJob,
    const std::set< CID > & aLinks
)

Fetches the nodes for the given links and root job.

Parameters:

  • aRootJob The root job of the current links to fetch
  • aLinks The links to fetch

Return: Success if the nodes were fetched, or failure otherwise

function GetDeltaFromNode

outcome::result< Delta > GetDeltaFromNode(
    const IPLDNode & aNode,
    bool created_by_self
)

Gets the Delta from a given IPLD node, filtering it if it wasn't created by self.

Parameters:

  • aNode The IPLD node to get the Delta from
  • created_by_self True if the node was created by self, false otherwise

Return: The Delta contained in the node, or failure otherwise

function MergeDataFromDelta

outcome::result< void > MergeDataFromDelta(
    const CID & node_cid,
    const Delta & aDelta
)

Merges the data from a given Delta into the CRDT set.

Parameters:

  • node_cid The CID of the node from which the Delta was obtained
  • aDelta The Delta to be merged

Return: Success if the Delta was merged, or failure otherwise

function ProcessJobIteration

outcome::result< void > ProcessJobIteration(
    const RootCIDJob & job_to_process
)

Processes A Root CID job.

Parameters:

Return: Success if the job was processed, or failure otherwise

function Sync

outcome::result< void > Sync(
    const HierarchicalKey & aKey
)

Return: returns outcome::success on success or outcome::failure otherwise

Sync ensures that all the data under the given prefix is flushed to disk in the underlying datastore

function PrintDAGRec

outcome::result< void > PrintDAGRec(
    const CID & aCID,
    uint64_t aDepth,
    std::vector< CID > & aSet
)

Parameters:

  • aCID CID of DAG record
  • aDepth depth used for indenting printed records
  • aSet set of CIDs to print

Return: returns outcome::success on success or outcome::failure otherwise

Helper funtion to print Merkle-DAG records

function RebroadcastHeads

void RebroadcastHeads()

Regularly send out a list of heads that we have not recently seen

function Broadcast

outcome::result< void > Broadcast(
    const std::set< CID > & cids,
    const std::string & topic,
    boost::optional< libp2p::peer::PeerInfo > peerInfo =boost::none
)

Broadcasts a set of CIDs. Encodes and broadcasts the provided list of CIDs.

Parameters:

  • cids The list of CIDs to broadcast.
  • topic The topic to broadcast to.
  • peerInfo Optional peer info to avoid repeated GetPeerInfo calls.

Return: outcome::success on success, or outcome::failure if an error occurs.

function EncodeBroadcast

outcome::result< Buffer > EncodeBroadcast(
    const std::set< CID > & heads
)

Parameters:

  • heads list of CIDs

Return: data encoded into Buffer data or outcome::failure on error

EncodeBroadcast encodes list of CIDs to CRDT broadcast data

function CreateIPLDNode

outcome::result< std::shared_ptr< IPLDNode > > CreateIPLDNode(
    const std::vector< std::pair< CID, std::string > > & aHeads,
    const std::shared_ptr< Delta > & aDelta,
    const std::unordered_set< std::string > & topics
) const

Parameters:

  • aHeads list of CIDs to add to node as IPLD links
  • aDelta Delta to serialize into IPLD node
  • topics Topics to add as links

Return: IPLD node or outcome::failure on error

CreateIPLDNode add block node to DAGSyncer

function CreateDAGNode

outcome::result< std::shared_ptr< IPLDNode > > CreateDAGNode(
    const std::shared_ptr< Delta > & aDelta,
    const std::unordered_set< std::string > & topics
)

function AddDAGNode

outcome::result< CID > AddDAGNode(
    const std::shared_ptr< CrdtDatastore::IPLDNode > & node
)

Parameters:

  • node Node to add and process

Return: CID or outcome::failure on error

AddDAGNode adds node to DAGSyncer and processes new blocks.

function SyncDatastore

outcome::result< void > SyncDatastore(
    const std::vector< HierarchicalKey > & aKeyList
)

Parameters:

  • aKeyList all heads and the set entries related to the given prefix

Return: returns outcome::success on success or outcome::failure otherwise

SyncDatastore sync heads and set datastore

function PutElementsCallback

void PutElementsCallback(
    const std::string & key,
    const Buffer & value,
    const std::string & cid
)

function DeleteElementsCallback

void DeleteElementsCallback(
    const std::string & key,
    const std::string & cid
)

function UpdateCRDTHeads

void UpdateCRDTHeads(
    const CID & rootCID,
    uint64_t rootPriority,
    bool add_topics_to_broadcast
)

function EnqueueRootCID

bool EnqueueRootCID(
    const CID & cid
)

function WaitForJob

outcome::result< CID > WaitForJob(
    const CID & cid
)

function EncodeBroadcastStatic

static outcome::result< Buffer > EncodeBroadcastStatic(
    const std::set< CID > & heads
)

Parameters:

  • heads list of CIDs

Return: data encoded into Buffer data or outcome::failure on error

EncodeBroadcastStatic encodes list of CIDs to CRDT broadcast data

Friends

friend PubSubBroadcasterExt

friend class PubSubBroadcasterExt(
    PubSubBroadcasterExt 
);

Updated on 2026-03-04 at 13:10:43 -0800