src/crdt/globaldb/globaldb.hpp¶
Namespaces¶
| Name |
|---|
| sgns |
| sgns::crdt |
Classes¶
| Name | |
|---|---|
| class | sgns::crdt::GlobalDB |
Functions¶
| Name | |
|---|---|
| OUTCOME_HPP_DECLARE_ERROR_2(sgns::crdt , GlobalDB::Error ) Macro for declaring error handling in the GlobalDB class. |
Functions Documentation¶
function OUTCOME_HPP_DECLARE_ERROR_2¶
Macro for declaring error handling in the GlobalDB class.
Source code¶
#ifndef SUPERGENIUS_CRDT_GLOBALDB_HPP
#define SUPERGENIUS_CRDT_GLOBALDB_HPP
#include <unordered_set>
#include <boost/asio/io_context.hpp>
#include <boost/filesystem/path.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/graphsync_impl.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/local_requests.hpp>
#include <ipfs_pubsub/gossip_pubsub_topic.hpp>
#include <libp2p/protocol/autonat/autonat.hpp>
#include <libp2p/protocol/holepunch/holepunch_client.hpp>
#include <libp2p/protocol/holepunch/holepunch_server.hpp>
#include <libp2p/protocol/identify/identify.hpp>
#include "crdt/atomic_transaction.hpp"
#include "crdt/crdt_datastore.hpp"
#include "crdt/crdt_options.hpp"
#include "outcome/outcome.hpp"
#include "pubsub_broadcaster_ext.hpp"
namespace sgns::crdt
{
class GlobalDB : public std::enable_shared_from_this<GlobalDB>
{
public:
using Buffer = base::Buffer;
using QueryResult = CrdtDatastore::QueryResult;
using RocksDB = storage::rocksdb;
using CRDTHeadListResult = CrdtHeads::CRDTListResult;
static outcome::result<std::shared_ptr<GlobalDB>> New(
std::shared_ptr<boost::asio::io_context> context,
std::string databasePath,
std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> pubsub,
std::shared_ptr<CrdtOptions> crdtOptions,
std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::Network> graphsyncnetwork,
std::shared_ptr<libp2p::protocol::Scheduler> scheduler,
std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::RequestIdGenerator> generator,
std::shared_ptr<RocksDB> datastore = nullptr );
~GlobalDB();
using DataPair = std::pair<HierarchicalKey, Buffer>;
using GlobalDBFilterCallback = CrdtDatastore::CRDTElementFilterCallback;
using GlobalDBNewElementCallback = CrdtDatastore::CRDTNewElementCallback;
using GlobalDBDeletedElementCallback = CrdtDatastore::CRDTDeletedElementCallback;
enum class Error: uint8_t
{
ROCKSDB_IO = 0,
IPFS_DB_NOT_CREATED,
DAG_SYNCHER_NOT_LISTENING,
CRDT_DATASTORE_NOT_CREATED,
PUBSUB_BROADCASTER_NOT_CREATED,
INVALID_PARAMETERS,
GLOBALDB_NOT_STARTED,
};
outcome::result<CID> Put( const HierarchicalKey &key,
const Buffer &value,
const std::unordered_set<std::string> &topics );
outcome::result<CID> Put( const std::vector<DataPair> &data_vector,
const std::unordered_set<std::string> &topics );
outcome::result<Buffer> Get( const HierarchicalKey &key );
outcome::result<CID> Remove( const HierarchicalKey &key, const std::unordered_set<std::string> &topics );
outcome::result<QueryResult> QueryKeyValues( std::string_view keyPrefix );
outcome::result<QueryResult> QueryKeyValues( const std::string &prefix_base,
const std::string &middle_part,
const std::string &remainder_prefix );
outcome::result<std::string> KeyToString( const Buffer &key ) const;
std::shared_ptr<AtomicTransaction> BeginTransaction();
outcome::result<void> AddBroadcastTopic( const std::string &topicName );
void AddListenTopic( const std::string &topicName );
void PrintDataStore();
std::shared_ptr<RocksDB> GetDataStore();
std::shared_ptr<sgns::crdt::PubSubBroadcasterExt> GetBroadcaster();
bool RegisterElementFilter( const std::string &pattern, GlobalDBFilterCallback filter );
bool RegisterNewElementCallback( const std::string &pattern, GlobalDBNewElementCallback callback );
bool RegisterDeletedElementCallback( const std::string &pattern, GlobalDBDeletedElementCallback callback );
void Start();
outcome::result<CRDTHeadListResult> GetCRDTHeadList();
outcome::result<uint64_t> GetCRDTHeadHeight( const CID &aCid, const std::string &topic );
outcome::result<void> CRDTHeadRemove( const CID &aCid, const std::string &topic );
outcome::result<void> CRDTHeadAdd( const CID &aCid, const std::string &topic, uint64_t priority );
outcome::result<crdt::CrdtDatastore::JobStatus> GetCIDJobStatus( const CID &cid ) const;
outcome::result<void> RequestHeadBroadcast( const std::set<std::string> &topics );
outcome::result<std::unordered_set<std::string>> GetMonitoredTopics() const;
std::shared_ptr<crdt::CrdtDatastore> GetCRDTDataStore();
private:
GlobalDB( std::shared_ptr<boost::asio::io_context> context,
std::string databasePath,
std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> pubsub );
outcome::result<void> Init( std::shared_ptr<CrdtOptions> crdtOptions,
std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::Network> graphsyncnetwork,
std::shared_ptr<libp2p::protocol::Scheduler> scheduler,
std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::RequestIdGenerator> generator,
std::shared_ptr<RocksDB> datastore = nullptr );
void scheduleBootstrap( std::shared_ptr<boost::asio::io_context> io_context,
std::shared_ptr<libp2p::Host> host );
std::shared_ptr<boost::asio::io_context> m_context;
std::string m_databasePath;
int m_dagSyncPort;
std::string m_graphSyncAddrs;
std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> m_pubsub;
std::shared_ptr<sgns::crdt::PubSubBroadcasterExt> m_broadcaster;
std::shared_ptr<RocksDB> m_datastore;
std::atomic_bool started_;
//std::shared_ptr<sgns::ipfs_lite::ipfs::dht::IpfsDHT> dht_;
//std::shared_ptr<libp2p::protocol::Identify> identify_;
//std::shared_ptr<libp2p::protocol::IdentifyMessageProcessor> identifymsgproc_;
//std::shared_ptr<libp2p::protocol::HolepunchClient> holepunch_;
//std::shared_ptr<libp2p::protocol::HolepunchClientMsgProc> holepunchmsgproc_;
int obsAddrRetries = 0;
std::shared_ptr<CrdtDatastore> m_crdtDatastore;
//Default Bootstrap Servers
std::vector<std::string> bootstrapAddresses_ = {
//"/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
//"/dnsaddr/bootstrap.libp2p.io/ipfs/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
//"/dnsaddr/bootstrap.libp2p.io/ipfs/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
//"/dnsaddr/bootstrap.libp2p.io/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
//"/ip4/64.225.105.42/tcp/4001/p2p/QmPo1ygpngghu5it8u4Mr3ym6SEU2Wp2wA66Z91Y1S1g29",
//"/ip4/3.92.45.153/tcp/4001/ipfs/12D3KooWP6R6XVCBK7t76o8VDwZdxpzAqVeDtHYQNmntP2y8NHvK",
"/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
"/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM", // pluto.i.ipfs.io
"/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu", // saturn.i.ipfs.io
"/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io
"/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io
"/ip6/2604:a880:1:20::203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM", // pluto.i.ipfs.io
"/ip6/2400:6180:0:d0::151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu", // saturn.i.ipfs.io
"/ip6/2604:a880:800:10::4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io
"/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io
//"/dnsaddr/fra1-1.hostnodes.pinata.cloud/ipfs/QmWaik1eJcGHq1ybTWe7sezRfqKNcDRNkeBaLnGwQJz1Cj",
//"/dnsaddr/fra1-2.hostnodes.pinata.cloud/ipfs/QmNfpLrQQZr5Ns9FAJKpyzgnDL2GgC6xBug1yUZozKFgu4",
//"/dnsaddr/fra1-3.hostnodes.pinata.cloud/ipfs/QmPo1ygpngghu5it8u4Mr3ym6SEU2Wp2wA66Z91Y1S1g29",
//"/dnsaddr/nyc1-1.hostnodes.pinata.cloud/ipfs/QmRjLSisUCHVpFa5ELVvX3qVPfdxajxWJEHs9kN3EcxAW6",
//"/dnsaddr/nyc1-2.hostnodes.pinata.cloud/ipfs/QmPySsdmbczdZYBpbi2oq2WMJ8ErbfxtkG8Mo192UHkfGP",
//"/dnsaddr/nyc1-3.hostnodes.pinata.cloud/ipfs/QmSarArpxemsPESa6FNkmuu9iSE1QWqPX2R3Aw6f5jq4D5",
};
sgns::base::Logger m_logger = sgns::base::createLogger( "GlobalDB" );
};
}
OUTCOME_HPP_DECLARE_ERROR_2( sgns::crdt, GlobalDB::Error );
#endif // SUPERGENIUS_CRDT_GLOBALDB_HPP
Updated on 2026-03-04 at 13:10:44 -0800