crdt_globaldb/globaldb_app.cpp¶
Types¶
| Name | |
|---|---|
| using sgns::base::Buffer | Buffer |
| using sgns::crdt::HierarchicalKey | HierarchicalKey |
| using libp2p::protocol::gossip::Gossip::SubscriptionData | SubscriptionData |
Functions¶
| Name | |
|---|---|
| void | PutHook(const std::string & k, const Buffer & v, const sgns::base::Logger & logger) |
| void | DeleteHook(const std::string & k, const sgns::base::Logger & logger) |
| int | main(int argc, char ** argv) |
Types Documentation¶
using Buffer¶
using HierarchicalKey¶
using SubscriptionData¶
Functions Documentation¶
function PutHook¶
Display key and value added to CRDT datastore
function DeleteHook¶
Display key removed from CRDT datastore
function main¶
Source code¶
#include <iterator>
#include <iostream>
#include <algorithm>
#include <boost/program_options.hpp>
#include <boost/algorithm/string/erase.hpp>
#include <boost/asio/io_context.hpp>
#include <storage/rocksdb/rocksdb.hpp>
#include <boost/filesystem.hpp>
#include <boost/random.hpp>
#include "crdt/globaldb/globaldb.hpp"
#include "crdt/globaldb/keypair_file_storage.hpp"
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/logger.hpp>
#include <libp2p/basic/scheduler/asio_scheduler_backend.hpp>
#include <libp2p/basic/scheduler/scheduler_impl.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/network/network.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/local_requests.hpp>
using Buffer = sgns::base::Buffer;
using HierarchicalKey = sgns::crdt::HierarchicalKey;
using SubscriptionData = libp2p::protocol::gossip::Gossip::SubscriptionData;
namespace po = boost::program_options;
void PutHook( const std::string &k, const Buffer &v, const sgns::base::Logger &logger );
void DeleteHook( const std::string &k, const sgns::base::Logger &logger );
int main( int argc, char **argv )
{
std::string strDatabasePath;
int pubsubListeningPort = 0;
bool daemonMode = false;
bool echoProtocol = false;
std::string multiAddress;
po::options_description desc( "Input arguments:" );
try
{
desc.add_options()( "help,h", "print help" )( "daemon,d", "Running in daemon mode" )(
"echo,e",
"Using echo protocol in daemon mode" )(
"databasePath,db",
po::value<std::string>( &strDatabasePath )->default_value( "CRDT.Datastore" ),
"Path to CRDT datastore" )(
"port, p",
po::value<int>( &pubsubListeningPort )->default_value( 33123 ),
"Port number" )( "pubsub-address", po::value<std::string>( &multiAddress ), "Pubsub server address" );
po::variables_map vm;
po::store( po::parse_command_line( argc, argv, desc ), vm );
po::notify( vm );
if ( vm.count( "daemon" ) )
{
daemonMode = true;
}
if ( vm.count( "echo" ) )
{
echoProtocol = true;
}
if ( vm.count( "help" ) )
{
std::cout << desc << "\n";
return EXIT_FAILURE;
}
}
catch ( std::exception &e )
{
std::cerr << "Error parsing arguments: " << e.what() << "\n";
std::cout << desc << "\n";
return EXIT_FAILURE;
}
const std::string logger_config( R"(
# ----------------
sinks:
- name: console
type: console
color: true
groups:
- name: globaldb_app
sink: console
level: info
children:
- name: libp2p
- name: Gossip
# ----------------
)" );
// prepare log system
auto logging_system = std::make_shared<soralog::LoggingSystem>( std::make_shared<soralog::ConfiguratorFromYAML>(
// Original LibP2P logging config
std::make_shared<libp2p::log::Configurator>(),
// Additional logging config for application
logger_config ) );
logging_system->configure();
libp2p::log::setLoggingSystem( logging_system );
auto io = std::make_shared<boost::asio::io_context>();
auto logger = sgns::base::createLogger( "globaldb" );
std::string broadcastChannel = "globaldb-example";
std::vector<std::string> pubsubBootstrapPeers;
if ( !multiAddress.empty() )
{
pubsubBootstrapPeers.push_back( multiAddress );
}
auto pubsub = std::make_shared<sgns::ipfs_pubsub::GossipPubSub>(
sgns::crdt::KeyPairFileStorage( strDatabasePath + "/pubsub" ).GetKeyPair().value() );
pubsub->Start( pubsubListeningPort, pubsubBootstrapPeers );
auto scheduler = std::make_shared<libp2p::basic::SchedulerImpl>(
std::make_shared<libp2p::basic::AsioSchedulerBackend>( io ),
libp2p::basic::Scheduler::Config{ std::chrono::milliseconds( 100 ) } );
auto graphsyncnetwork = std::make_shared<sgns::ipfs_lite::ipfs::graphsync::Network>( pubsub->GetHost(), scheduler );
auto generator = std::make_shared<sgns::ipfs_lite::ipfs::graphsync::RequestIdGenerator>();
auto crdtOptions = sgns::crdt::CrdtOptions::DefaultOptions();
crdtOptions->logger = logger;
auto globaldb_ret =
sgns::crdt::GlobalDB::New( io, strDatabasePath, pubsub, crdtOptions, graphsyncnetwork, scheduler, generator );
if ( globaldb_ret.has_error() )
{
return -1;
}
auto globalDB = std::move( globaldb_ret.value() );
globalDB->AddListenTopic( broadcastChannel );
globalDB->AddBroadcastTopic( broadcastChannel );
globalDB->Start();
std::ostringstream streamDisplayDetails;
// @todo fix commented output
//streamDisplayDetails << "\n\n\nPeer ID: " << peerID.toBase58() << std::endl;
//streamDisplayDetails << "Listen address: " << pubsub->GetLocalAddress() << std::endl;
//streamDisplayDetails << "DAG syncer address: " << listen_to.getStringAddress() << std::endl;
streamDisplayDetails << "Broadcast channel: " << broadcastChannel << std::endl;
//streamDisplayDetails << "Data folder: " << strDatabasePathAbsolute << std::endl;
streamDisplayDetails << std::endl;
streamDisplayDetails << "Ready!" << std::endl;
streamDisplayDetails << std::endl;
streamDisplayDetails << "Commands: " << std::endl;
streamDisplayDetails << std::endl;
streamDisplayDetails << "> list -> list items in the store" << std::endl;
streamDisplayDetails << "> get <key> -> get value for a key" << std::endl;
streamDisplayDetails << "> put <key> <value> -> store value on a key" << std::endl;
streamDisplayDetails << "> exit -> quit" << std::endl;
streamDisplayDetails << std::endl;
std::cout << streamDisplayDetails.str();
if ( daemonMode )
{
std::cout << "Running in daemon mode" << ( echoProtocol ? " with echo protocol\n" : "\n" ) << std::endl;
boost::asio::signal_set signals( *io, SIGINT, SIGTERM );
signals.async_wait( [&io]( const boost::system::error_code &, int ) { io->stop(); } );
// run event loop
io->run();
}
else
{
std::cout << "> ";
std::string command;
while ( std::getline( std::cin, command ) )
{
if ( command.empty() )
{
std::cout << "> ";
}
else if ( command == "exit" || command == "quit" )
{
break;
}
else if ( command == "list" )
{
auto queryResult = globalDB->QueryKeyValues( "" );
if ( queryResult.has_failure() )
{
std::cout << "Unable list keys from CRDT datastore" << std::endl;
}
else
{
for ( const auto &element : queryResult.value() )
{
// key name: /crdt/s/k/<key>/v
auto strKey = std::string( element.first.toString() );
std::cout << "[" << strKey << "] -> " << element.second.toString() << std::endl;
}
}
}
else if ( command.rfind( "get" ) == 0 )
{
std::string key = command.substr( 3 );
key.erase( std::remove_if( key.begin(), key.end(), ::isspace ), key.end() );
if ( key.empty() )
{
std::cout << "get <key>" << std::endl;
}
else
{
auto getKeyResult = globalDB->Get( HierarchicalKey( key ) );
if ( getKeyResult.has_failure() )
{
std::cout << "Unable to find key in CRDT datastore: " << key << std::endl;
}
else
{
std::cout << "[" << key << "] -> " << getKeyResult.value().toString() << std::endl;
}
}
}
else if ( command.rfind( "put" ) == 0 )
{
size_t pos = 0;
std::vector<std::string> commandList;
std::string commandToParse = command;
while ( ( pos = commandToParse.find( " " ) ) != std::string::npos )
{
commandList.push_back( commandToParse.substr( 0, pos ) );
commandToParse.erase( 0, pos + 1 );
}
if ( !commandToParse.empty() )
{
commandList.push_back( commandToParse );
}
if ( commandList.size() < 3 )
{
std::cout << "put <key> <value>" << std::endl;
}
else
{
auto key = commandList[1];
std::string value = commandList[2];
if ( commandList.size() > 3 )
{
for ( size_t i = 3; i < commandList.size(); ++i )
{
value += " " + commandList[i];
}
}
Buffer valueBuffer;
valueBuffer.put( value );
auto setKeyResult = globalDB->Put( HierarchicalKey( key ), valueBuffer, { "test" } );
if ( setKeyResult.has_failure() )
{
std::cout << "Unable to put key-value to CRDT datastore: " << key << " " << value << std::endl;
}
}
}
std::cout << "> ";
}
}
return EXIT_SUCCESS;
}
void PutHook( const std::string &k, const Buffer &v, const sgns::base::Logger &logger )
{
if ( logger != nullptr )
{
std::string key = k;
if ( !key.empty() && key[0] == '/' )
{
key.erase( 0, 1 );
}
logger->info( "CRDT datastore: Added [" + key + "] -> " + std::string( v.toString() ) );
}
}
void DeleteHook( const std::string &k, const sgns::base::Logger &logger )
{
if ( logger != nullptr )
{
std::string key = k;
if ( !key.empty() && key[0] == '/' )
{
key.erase( 0, 1 );
}
logger->info( "CRDT datastore: Removed [" + key + "]" );
}
}
Updated on 2026-04-13 at 23:22:46 -0700