sgns::crdt::CrdtDatastore¶
Forward declaration of CRDT Set class. More...
#include <crdt_datastore.hpp>
Inherits from std::enable_shared_from_this< CrdtDatastore >
Protected Classes¶
| Name | |
|---|---|
| struct | RootCIDJob |
| struct | DagWorker |
Public Types¶
| Name | |
|---|---|
| enum class | JobStatus |
| enum class | Error |
| using base::Buffer | Buffer |
| using base::Logger | Logger |
| using storage::rocksdb | RocksDB |
| using RocksDB::QueryResult | QueryResult |
| using pb::Delta | Delta |
| using pb::Element | Element |
| using ipfs_lite::ipld::IPLDNode | IPLDNode |
| using CRDTDataFilter::ElementFilterCallback | CRDTElementFilterCallback |
| using CRDTCallbackManager::NewDataCallback | CRDTNewElementCallback |
| using CRDTCallbackManager::DeletedDataCallback | CRDTDeletedElementCallback |
Public Functions¶
| Name | |
|---|---|
| std::shared_ptr< CrdtDatastore > | New(std::shared_ptr< RocksDB > aDatastore, const HierarchicalKey & aKey, std::shared_ptr< DAGSyncer > aDagSyncer, std::shared_ptr< Broadcaster > aBroadcaster, std::shared_ptr< CrdtOptions > aOptions) Factory method to create a shared_ptr to a CrdtDatastore. |
| std::shared_ptr< Delta > | DeltaMerge(const std::shared_ptr< Delta > & aDelta1, const std::shared_ptr< Delta > & aDelta2) |
| std::string | GetValueSuffix() |
| outcome::result< std::vector< CID > > | DecodeBroadcast(const Buffer & buff) |
| outcome::result< std::shared_ptr< Delta > > | CreateDeltaToAdd(const std::string & key, const std::string & value) |
| void | Start() Starts the datastore threads. |
| virtual | ~CrdtDatastore() Destructor of the CRDT datastore. |
| outcome::result< Buffer > | GetKey(const HierarchicalKey & aKey) const |
| outcome::result< QueryResult > | QueryKeyValues(std::string_view aPrefix) const |
| outcome::result< QueryResult > | QueryKeyValues(const std::string & prefix_base, const std::string & middle_part, const std::string & remainder_prefix) const Queries with a middle part that can be a wildcard, negated string or normal string. |
| std::string | GetKeysPrefix() const |
| outcome::result< CID > | PutKey(const HierarchicalKey & aKey, const Buffer & aValue, const std::unordered_set< std::string > & topics) Stores the given value in the CRDT store. |
| outcome::result< bool > | HasKey(const HierarchicalKey & aKey) const |
| outcome::result< CID > | DeleteKey(const HierarchicalKey & aKey, const std::unordered_set< std::string > & topics) |
| outcome::result< CID > | Publish(const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics) Publishes a Delta. Creates a DAG node from the given Delta, merges it into the CRDT, and broadcasts the node. |
| outcome::result< void > | PrintDAG() |
| outcome::result< std::shared_ptr< Delta > > | CreateDeltaToRemove(const std::string & key) const |
| void | PrintDataStore() |
| void | Close() |
| bool | RegisterElementFilter(const std::string & pattern, CRDTElementFilterCallback filter) |
| bool | RegisterNewElementCallback(const std::string & pattern, CRDTNewElementCallback callback) |
| bool | RegisterDeletedElementCallback(const std::string & pattern, CRDTDeletedElementCallback callback) |
| void | AddTopicName(const std::string & topic) Configure which topic this datastore should filter on. |
| outcome::result< CrdtHeads::CRDTListResult > | GetHeadList() |
| outcome::result< void > | RemoveHead(const CID & aCid, const std::string & topic) |
| outcome::result< uint64_t > | GetHeadHeight(const CID & aCid, const std::string & topic) |
| outcome::result< void > | AddHead(const CID & aCid, const std::string & topic, uint64_t priority) |
| outcome::result< JobStatus > | GetJobStatus(const CID & cid) |
| outcome::result< void > | BroadcastHeadsForTopics(const std::set< std::string > & topics) Broadcast heads for the specified topics. |
| std::unordered_set< std::string > | GetTopicNames() const |
Protected Functions¶
| Name | |
|---|---|
| void | HandleCIDBroadcast() Handles when a CID broadcast gets received If the CID is not known triggers HandleRootCIDBlock. |
| outcome::result< void > | HandleRootCIDBlock(const CID & aCid) Handles a root CID block by creating a job to fetch and process its content. |
| outcome::result< RootCIDJob > | CreateRootJob(const CID & aRootCID) Creates a RootCIDJob for the given root CID. |
| outcome::result< std::set< CID > > | GetLinksToFetch(const RootCIDJob & job) Gets the links to fetch for a given node in a job. |
| outcome::result< void > | FetchNodes(const RootCIDJob & aRootJob, const std::set< CID > & aLinks) Fetches the nodes for the given links and root job. |
| outcome::result< Delta > | GetDeltaFromNode(const IPLDNode & aNode, bool created_by_self) Gets the Delta from a given IPLD node, filtering it if it wasn't created by self. |
| outcome::result< void > | MergeDataFromDelta(const CID & node_cid, const Delta & aDelta) Merges the data from a given Delta into the CRDT set. |
| outcome::result< void > | ProcessJobIteration(const RootCIDJob & job_to_process) Processes A Root CID job. |
| outcome::result< void > | Sync(const HierarchicalKey & aKey) |
| outcome::result< void > | PrintDAGRec(const CID & aCID, uint64_t aDepth, std::vector< CID > & aSet) |
| void | RebroadcastHeads() |
| outcome::result< void > | Broadcast(const std::set< CID > & cids, const std::string & topic, boost::optional< libp2p::peer::PeerInfo > peerInfo =boost::none) Broadcasts a set of CIDs. Encodes and broadcasts the provided list of CIDs. |
| outcome::result< Buffer > | EncodeBroadcast(const std::set< CID > & heads) |
| outcome::result< std::shared_ptr< IPLDNode > > | CreateIPLDNode(const std::vector< std::pair< CID, std::string > > & aHeads, const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics) const |
| outcome::result< std::shared_ptr< IPLDNode > > | CreateDAGNode(const std::shared_ptr< Delta > & aDelta, const std::unordered_set< std::string > & topics) |
| outcome::result< CID > | AddDAGNode(const std::shared_ptr< CrdtDatastore::IPLDNode > & node) |
| outcome::result< void > | SyncDatastore(const std::vector< HierarchicalKey > & aKeyList) |
| void | PutElementsCallback(const std::string & key, const Buffer & value, const std::string & cid) |
| void | DeleteElementsCallback(const std::string & key, const std::string & cid) |
| void | UpdateCRDTHeads(const CID & rootCID, uint64_t rootPriority, bool add_topics_to_broadcast) |
| bool | EnqueueRootCID(const CID & cid) |
| outcome::result< CID > | WaitForJob(const CID & cid) |
| outcome::result< Buffer > | EncodeBroadcastStatic(const std::set< CID > & heads) |
Friends¶
| Name | |
|---|---|
| class | PubSubBroadcasterExt |
Detailed Description¶
Forward declaration of CRDT Set class.
CRDT datastore class based on https://github.com/ipfs/go-ds-crdt
Public Types Documentation¶
enum JobStatus¶
| Enumerator | Value | Description |
|---|---|---|
| PENDING | ||
| COMPLETED | ||
| FAILED |
enum Error¶
| Enumerator | Value | Description |
|---|---|---|
| INVALID_PARAM | 0 | |
| FETCH_ROOT_NODE | ||
| NODE_DESERIALIZATION | ||
| FETCHING_GRAPH | ||
| NODE_CREATION | ||
| GET_NODE | ||
| INVALID_JOB |
using Buffer¶
using Logger¶
using RocksDB¶
using QueryResult¶
using Delta¶
using Element¶
using IPLDNode¶
using CRDTElementFilterCallback¶
using CRDTNewElementCallback¶
using CRDTDeletedElementCallback¶
using sgns::crdt::CrdtDatastore::CRDTDeletedElementCallback = CRDTCallbackManager::DeletedDataCallback;
Public Functions Documentation¶
function New¶
static std::shared_ptr< CrdtDatastore > New(
std::shared_ptr< RocksDB > aDatastore,
const HierarchicalKey & aKey,
std::shared_ptr< DAGSyncer > aDagSyncer,
std::shared_ptr< Broadcaster > aBroadcaster,
std::shared_ptr< CrdtOptions > aOptions
)
Factory method to create a shared_ptr to a CrdtDatastore.
Parameters:
- aDatastore The underlying database where CRDT is stored
- aKey The namespace key on the database where CRDT's variables will be stored
- aDagSyncer The MerkleDAG syncer to request content of CIDs
- aBroadcaster The broadcaster to publish CIDs
- aOptions Options to construct the object
Return: A new instance of CrdtDatastore
function DeltaMerge¶
static std::shared_ptr< Delta > DeltaMerge(
const std::shared_ptr< Delta > & aDelta1,
const std::shared_ptr< Delta > & aDelta2
)
Parameters:
Return: pointer to merged delta
Static function to merge delta elements and tombstones, use highest priority for the result delta
function GetValueSuffix¶
Return: value suffix
Get value suffix used in set, e.g. /v
function DecodeBroadcast¶
Parameters:
- buff Buffer data to decode
Return: vector of CIDs or outcome::failure on error
DecodeBroadcast decodes CRDT broadcast data
function CreateDeltaToAdd¶
static outcome::result< std::shared_ptr< Delta > > CreateDeltaToAdd(
const std::string & key,
const std::string & value
)
Parameters:
- key - delta key to add to datastore
- value - delta value to add to datastore
Return: pointer to new delta or outcome::failure on error
Returns a new delta-set adding the given key/value.
function Start¶
Starts the datastore threads.
function ~CrdtDatastore¶
Destructor of the CRDT datastore.
function GetKey¶
Parameters:
- aKey Hierarchical key to get
Return: value as a Buffer
Get the value of an element not tombstoned from the CRDT set by key
function QueryKeyValues¶
Parameters:
- aPrefix prefix to search, if empty string, return all
Return: list of key-value pairs matches prefix
Query CRDT set key-value pairs by prefix, if prefix empty return all elements are not tombstoned
function QueryKeyValues¶
outcome::result< QueryResult > QueryKeyValues(
const std::string & prefix_base,
const std::string & middle_part,
const std::string & remainder_prefix
) const
Queries with a middle part that can be a wildcard, negated string or normal string.
Parameters:
- prefix_base The base prefix to query
- middle_part Either a string (normal query), '*' or !string
- remainder_prefix The remainder part of the query prefix
Return: A list of key value pairs
function GetKeysPrefix¶
Return: key prefix
Get key prefix used in set, e.g. /namespace/s/k/
function PutKey¶
outcome::result< CID > PutKey(
const HierarchicalKey & aKey,
const Buffer & aValue,
const std::unordered_set< std::string > & topics
)
Stores the given value in the CRDT store.
Parameters:
- aKey Hierarchical key to put
- aValue Value to be stored
- topics Topics to publish to
Return: outcome::success if stored and broadcasted successfully, or outcome::failure otherwise.
function HasKey¶
Parameters:
- aKey HierarchicalKey to look for in set
Return: true if key found or false if not found or outcome::failure on error
HasKey returns whether the key is mapped to a value in set
function DeleteKey¶
outcome::result< CID > DeleteKey(
const HierarchicalKey & aKey,
const std::unordered_set< std::string > & topics
)
Parameters:
- aKey HierarchicalKey to delete from set
- topics Topics to publish to
Return: outcome::failure on error or success otherwise
Delete removes the value for given key.
function Publish¶
outcome::result< CID > Publish(
const std::shared_ptr< Delta > & aDelta,
const std::unordered_set< std::string > & topics
)
Publishes a Delta. Creates a DAG node from the given Delta, merges it into the CRDT, and broadcasts the node.
Parameters:
- aDelta Delta to publish
- topics Topics to publish to
Return: returns outcome::success on success or outcome::failure otherwise
function PrintDAG¶
Return: returns outcome::success on success or outcome::failure otherwise
PrintDAG pretty prints the current Merkle-DAG using the given printFunc
function CreateDeltaToRemove¶
Parameters:
- key - delta key to remove from datastore
Return: pointer to delta or outcome::failure on error
Returns a new delta-set removing the given keys with prefix /namespace/s/key
function PrintDataStore¶
function Close¶
Close shuts down the CRDT datastore and worker threads. It should not be used afterwards.
function RegisterElementFilter¶
function RegisterNewElementCallback¶
function RegisterDeletedElementCallback¶
bool RegisterDeletedElementCallback(
const std::string & pattern,
CRDTDeletedElementCallback callback
)
function AddTopicName¶
Configure which topic this datastore should filter on.
Parameters:
- topic The topic name to use when filtering links. Only links whose
IPLDLinkImpl::getName()equals this string will be processed.
When processing or rebroadcasting Merkle-DAG links, only those whose name exactly matches the topic set via this call will be considered.
function GetHeadList¶
function RemoveHead¶
function GetHeadHeight¶
function AddHead¶
function GetJobStatus¶
function BroadcastHeadsForTopics¶
Broadcast heads for the specified topics.
Parameters:
- topics Vector of topic names to broadcast heads for
Return: outcome::success on success, or outcome::failure on error
function GetTopicNames¶
Protected Functions Documentation¶
function HandleCIDBroadcast¶
Handles when a CID broadcast gets received If the CID is not known triggers HandleRootCIDBlock.
function HandleRootCIDBlock¶
Handles a root CID block by creating a job to fetch and process its content.
Parameters:
- aCid The root CID to be handled
Return: Success if the Root Job was created, or failure otherwise
function CreateRootJob¶
Creates a RootCIDJob for the given root CID.
Parameters:
- aRootCID The root CID to create the job for
Return: Success if Root Job created, or failure otherwise
function GetLinksToFetch¶
Gets the links to fetch for a given node in a job.
Parameters:
- job The root job of the current links to fetch
Return: List of CIDs to fetch, or failure otherwise
function FetchNodes¶
Fetches the nodes for the given links and root job.
Parameters:
- aRootJob The root job of the current links to fetch
- aLinks The links to fetch
Return: Success if the nodes were fetched, or failure otherwise
function GetDeltaFromNode¶
Gets the Delta from a given IPLD node, filtering it if it wasn't created by self.
Parameters:
- aNode The IPLD node to get the Delta from
- created_by_self True if the node was created by self, false otherwise
Return: The Delta contained in the node, or failure otherwise
function MergeDataFromDelta¶
Merges the data from a given Delta into the CRDT set.
Parameters:
Return: Success if the Delta was merged, or failure otherwise
function ProcessJobIteration¶
Processes A Root CID job.
Parameters:
- job_to_process The job received by either HandleCIDBroadcast or by AddDAGNode
Return: Success if the job was processed, or failure otherwise
function Sync¶
Return: returns outcome::success on success or outcome::failure otherwise
Sync ensures that all the data under the given prefix is flushed to disk in the underlying datastore
function PrintDAGRec¶
Parameters:
- aCID CID of DAG record
- aDepth depth used for indenting printed records
- aSet set of CIDs to print
Return: returns outcome::success on success or outcome::failure otherwise
Helper funtion to print Merkle-DAG records
function RebroadcastHeads¶
Regularly send out a list of heads that we have not recently seen
function Broadcast¶
outcome::result< void > Broadcast(
const std::set< CID > & cids,
const std::string & topic,
boost::optional< libp2p::peer::PeerInfo > peerInfo =boost::none
)
Broadcasts a set of CIDs. Encodes and broadcasts the provided list of CIDs.
Parameters:
- cids The list of CIDs to broadcast.
- topic The topic to broadcast to.
- peerInfo Optional peer info to avoid repeated GetPeerInfo calls.
Return: outcome::success on success, or outcome::failure if an error occurs.
function EncodeBroadcast¶
Parameters:
- heads list of CIDs
Return: data encoded into Buffer data or outcome::failure on error
EncodeBroadcast encodes list of CIDs to CRDT broadcast data
function CreateIPLDNode¶
outcome::result< std::shared_ptr< IPLDNode > > CreateIPLDNode(
const std::vector< std::pair< CID, std::string > > & aHeads,
const std::shared_ptr< Delta > & aDelta,
const std::unordered_set< std::string > & topics
) const
Parameters:
- aHeads list of CIDs to add to node as IPLD links
- aDelta Delta to serialize into IPLD node
- topics Topics to add as links
Return: IPLD node or outcome::failure on error
CreateIPLDNode add block node to DAGSyncer
function CreateDAGNode¶
outcome::result< std::shared_ptr< IPLDNode > > CreateDAGNode(
const std::shared_ptr< Delta > & aDelta,
const std::unordered_set< std::string > & topics
)
function AddDAGNode¶
Parameters:
- node Node to add and process
Return: CID or outcome::failure on error
AddDAGNode adds node to DAGSyncer and processes new blocks.
function SyncDatastore¶
Parameters:
- aKeyList all heads and the set entries related to the given prefix
Return: returns outcome::success on success or outcome::failure otherwise
SyncDatastore sync heads and set datastore
function PutElementsCallback¶
function DeleteElementsCallback¶
function UpdateCRDTHeads¶
function EnqueueRootCID¶
function WaitForJob¶
function EncodeBroadcastStatic¶
Parameters:
- heads list of CIDs
Return: data encoded into Buffer data or outcome::failure on error
EncodeBroadcastStatic encodes list of CIDs to CRDT broadcast data
Friends¶
friend PubSubBroadcasterExt¶
Updated on 2026-03-04 at 13:10:43 -0800