#include "globaldb.hpp"
#include "pubsub_broadcaster_ext.hpp"
#include "keypair_file_storage.hpp"
#include "crdt/crdt_datastore.hpp"
#include "crdt/graphsync_dagsyncer.hpp"
#include "crdt/atomic_transaction.hpp"
#include <ipfs_lite/ipfs/impl/datastore_rocksdb.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/graphsync_impl.hpp>
#include <rocksdb/db.h>
#include <libp2p/host/host.hpp>
#include <libp2p/injector/host_injector.hpp>
#include <libp2p/protocol/common/asio/asio_scheduler.hpp>
#include <libp2p/injector/kademlia_injector.hpp>
#include <boost/format.hpp>
#if defined( _WIN32 )
#include <winsock2.h>
#include <iphlpapi.h>
#pragma comment( lib, "iphlpapi.lib" )
#pragma comment( lib, "ws2_32.lib" )
#else
#endif
OUTCOME_CPP_DEFINE_CATEGORY_3( sgns::crdt, GlobalDB::Error, e )
{
using ProofError = sgns::crdt::GlobalDB::Error;
switch ( e )
{
case ProofError::ROCKSDB_IO:
return "RocksDB I/O error";
case ProofError::IPFS_DB_NOT_CREATED:
return "IPFS Database creation error";
case ProofError::DAG_SYNCHER_NOT_LISTENING:
return "DAG Syncher listen error";
case ProofError::CRDT_DATASTORE_NOT_CREATED:
return "CRDT DataStore creation error";
case ProofError::PUBSUB_BROADCASTER_NOT_CREATED:
return "Pubsub Broadcaster creation error";
case ProofError::INVALID_PARAMETERS:
return "Invalid parameters provided";
case ProofError::GLOBALDB_NOT_STARTED:
return "Start method wasn't called";
}
return "Unknown error";
}
namespace sgns::crdt
{
using CrdtOptions = crdt::CrdtOptions;
using CrdtDatastore = crdt::CrdtDatastore;
using HierarchicalKey = crdt::HierarchicalKey;
using GraphsyncDAGSyncer = crdt::GraphsyncDAGSyncer;
using RocksdbDatastore = ipfs_lite::ipfs::RocksdbDatastore;
using IpfsRocksDb = ipfs_lite::rocksdb;
using GossipPubSub = ipfs_pubsub::GossipPubSub;
using GraphsyncImpl = ipfs_lite::ipfs::graphsync::GraphsyncImpl;
outcome::result<std::shared_ptr<GlobalDB>> GlobalDB::New(
std::shared_ptr<boost::asio::io_context> context,
std::string databasePath,
std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> pubsub,
std::shared_ptr<CrdtOptions> crdtOptions,
std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::Network> graphsyncnetwork,
std::shared_ptr<libp2p::protocol::Scheduler> scheduler,
std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::RequestIdGenerator> generator,
std::shared_ptr<RocksDB> datastore )
{
if ( ( !context ) || ( !generator ) || ( !pubsub ) || ( !graphsyncnetwork ) )
{
return outcome::failure( Error::INVALID_PARAMETERS );
}
auto new_instance = std::shared_ptr<GlobalDB>(
new GlobalDB( std::move( context ), std::move( databasePath ), std::move( pubsub ) ) );
BOOST_OUTCOME_TRYV2( auto &&,
new_instance->Init( std::move( crdtOptions ),
std::move( graphsyncnetwork ),
std::move( scheduler ),
std::move( generator ),
std::move( datastore ) ) );
return new_instance;
}
GlobalDB::GlobalDB( std::shared_ptr<boost::asio::io_context> context,
std::string databasePath,
std::shared_ptr<sgns::ipfs_pubsub::GossipPubSub> pubsub ) :
m_context( std::move( context ) ),
m_databasePath( std::move( databasePath ) ),
m_pubsub( std::move( pubsub ) ),
started_{ false }
{
}
GlobalDB::~GlobalDB()
{
m_logger->debug( "~GlobalDB CALLED" );
if ( m_broadcaster )
{
m_broadcaster->Stop();
}
if ( m_crdtDatastore )
{
m_crdtDatastore->Close();
}
}
outcome::result<void> GlobalDB::Init(
std::shared_ptr<CrdtOptions> crdtOptions,
std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::Network> graphsyncnetwork,
std::shared_ptr<libp2p::protocol::Scheduler> scheduler,
std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::RequestIdGenerator> generator,
std::shared_ptr<RocksDB> datastore )
{
std::shared_ptr<RocksDB> dataStore = std::move( datastore );
if ( dataStore == nullptr )
{
auto databasePathAbsolute = boost::filesystem::absolute( m_databasePath ).string();
// Create new database
m_logger->info( "Opening database " + databasePathAbsolute );
RocksDB::Options options;
options.create_if_missing = true; // intentionally
options.target_file_size_base = 32 * 1024 * 1024;
options.max_compaction_bytes = 32 * 1024 * 1024;
options.write_buffer_size = 32 * 1024 * 1024;
options.level0_file_num_compaction_trigger = 1;
options.target_file_size_multiplier = 1;
options.level_compaction_dynamic_level_bytes = false;
try
{
auto dataStoreResult = RocksDB::create( databasePathAbsolute, options );
// If database open fails with corruption, try to repair it
if ( !dataStoreResult.has_value() )
{
std::string errorMsg = dataStoreResult.error().message();
if ( errorMsg.find( "corruption" ) != std::string::npos ||
errorMsg.find( "Corruption" ) != std::string::npos )
{
m_logger->warn( "Database corruption detected, attempting repair: {}", databasePathAbsolute );
auto repairStatus = ::ROCKSDB_NAMESPACE::RepairDB( databasePathAbsolute, options );
if ( repairStatus.ok() )
{
m_logger->info( "Database repair successful, retrying open" );
dataStoreResult = RocksDB::create( databasePathAbsolute, options );
}
else
{
m_logger->error( "Database repair failed: {}", repairStatus.ToString() );
}
}
}
if ( dataStoreResult.has_value() )
{
dataStore = std::move( dataStoreResult.value() );
}
else
{
m_logger->error( "Unable to open database: " + std::string( dataStoreResult.error().message() ) );
return outcome::failure( boost::system::error_code{} );
}
}
catch ( std::exception &e )
{
m_logger->error( "Unable to open database: " + std::string( e.what() ) );
return Error::ROCKSDB_IO;
}
}
m_datastore = std::move( dataStore );
IpfsRocksDb::Options rdbOptions;
rdbOptions.create_if_missing = true; // intentionally
auto ipfsDBResult = IpfsRocksDb::create( m_datastore->getDB() );
if ( ipfsDBResult.has_error() )
{
m_logger->error( "Unable to create database for IPFS datastore" );
return Error::IPFS_DB_NOT_CREATED;
}
auto ipfsDataStore = std::make_shared<RocksdbDatastore>( ipfsDBResult.value() );
if ( !m_pubsub )
{
m_logger->error( "pubsub not initialized." );
return outcome::failure( Error::DAG_SYNCHER_NOT_LISTENING );
}
std::shared_ptr<libp2p::Host> host = m_pubsub->GetHost();
auto graphsync = std::make_shared<GraphsyncImpl>( host,
std::move( scheduler ),
graphsyncnetwork,
generator,
m_context );
auto dagSyncer = std::make_shared<GraphsyncDAGSyncer>( ipfsDataStore, graphsync, host );
// Start DagSyner listener
auto startResult = dagSyncer->StartSync();
if ( startResult.has_failure() )
{
m_logger->error( "DAG Syncher not listening" );
return startResult.error();
}
m_broadcaster = PubSubBroadcasterExt::New( dagSyncer, m_pubsub );
if ( m_broadcaster == nullptr )
{
m_logger->error( "Unable to create PubSub broadcaster" );
return Error::PUBSUB_BROADCASTER_NOT_CREATED;
}
m_crdtDatastore = CrdtDatastore::New( m_datastore,
HierarchicalKey( "crdt" ),
dagSyncer,
m_broadcaster,
crdtOptions );
if ( m_crdtDatastore == nullptr )
{
m_logger->error( "Unable to create CRDT datastore" );
return Error::CRDT_DATASTORE_NOT_CREATED;
}
return outcome::success();
}
void GlobalDB::Start()
{
if ( !started_ )
{
started_ = true;
m_crdtDatastore->Start();
m_broadcaster->Start();
}
}
outcome::result<CID> GlobalDB::Put( const HierarchicalKey &key,
const Buffer &value,
const std::unordered_set<std::string> &topics )
{
if ( !started_ )
{
m_logger->error( "{}: GlobalDB Not Started", __func__ );
return outcome::failure( Error::GLOBALDB_NOT_STARTED );
}
return m_crdtDatastore->PutKey( key, value, topics );
}
outcome::result<CID> GlobalDB::Put( const std::vector<DataPair> &data_vector,
const std::unordered_set<std::string> &topics )
{
if ( !started_ )
{
m_logger->error( "{}: GlobalDB Not Started", __func__ );
return outcome::failure( Error::GLOBALDB_NOT_STARTED );
}
AtomicTransaction batch( m_crdtDatastore );
for ( auto &data : data_vector )
{
BOOST_OUTCOME_TRYV2( auto &&, batch.Put( std::get<0>( data ), std::get<1>( data ) ) );
}
return batch.Commit( topics );
}
outcome::result<GlobalDB::Buffer> GlobalDB::Get( const HierarchicalKey &key )
{
return m_crdtDatastore->GetKey( key );
}
outcome::result<CID> GlobalDB::Remove( const HierarchicalKey &key, const std::unordered_set<std::string> &topics )
{
if ( !started_ )
{
m_logger->error( "{}: GlobalDB Not Started", __func__ );
return outcome::failure( Error::GLOBALDB_NOT_STARTED );
}
return m_crdtDatastore->DeleteKey( key, topics );
}
outcome::result<GlobalDB::QueryResult> GlobalDB::QueryKeyValues( std::string_view keyPrefix )
{
return m_crdtDatastore->QueryKeyValues( keyPrefix );
}
outcome::result<GlobalDB::QueryResult> GlobalDB::QueryKeyValues( const std::string &prefix_base,
const std::string &middle_part,
const std::string &remainder_prefix )
{
return m_crdtDatastore->QueryKeyValues( prefix_base, middle_part, remainder_prefix );
}
outcome::result<std::string> GlobalDB::KeyToString( const Buffer &key ) const
{
// @todo cache the prefix and suffix
auto keysPrefix = m_crdtDatastore->GetKeysPrefix();
auto valueSuffix = m_crdtDatastore->GetValueSuffix();
auto sKey = std::string( key.toString() );
if ( auto prefixPos = keysPrefix.empty() ? 0 : sKey.find( keysPrefix, 0 ); prefixPos != 0 )
{
return outcome::failure( std::errc::invalid_argument );
}
size_t keyPos = keysPrefix.size();
auto suffixPos = valueSuffix.empty() ? sKey.size() : sKey.rfind( valueSuffix, std::string::npos );
if ( suffixPos == std::string::npos || suffixPos < keyPos )
{
return outcome::failure( std::errc::invalid_argument );
}
return sKey.substr( keyPos, suffixPos - keyPos );
}
void GlobalDB::PrintDataStore()
{
m_crdtDatastore->PrintDataStore();
}
std::shared_ptr<AtomicTransaction> GlobalDB::BeginTransaction()
{
return std::make_shared<AtomicTransaction>( m_crdtDatastore );
}
outcome::result<void> GlobalDB::AddBroadcastTopic( const std::string &topicName )
{
return m_broadcaster->AddBroadcastTopic( topicName );
}
void GlobalDB::AddListenTopic( const std::string &topicName )
{
m_broadcaster->AddListenTopic( topicName );
m_crdtDatastore->AddTopicName( topicName );
}
bool GlobalDB::RegisterElementFilter( const std::string &pattern, GlobalDBFilterCallback filter )
{
return m_crdtDatastore->RegisterElementFilter( pattern, std::move( filter ) );
}
bool GlobalDB::RegisterNewElementCallback( const std::string &pattern, GlobalDBNewElementCallback callback )
{
return m_crdtDatastore->RegisterNewElementCallback( pattern, std::move( callback ) );
}
bool GlobalDB::RegisterDeletedElementCallback( const std::string &pattern, GlobalDBDeletedElementCallback callback )
{
return m_crdtDatastore->RegisterDeletedElementCallback( pattern, std::move( callback ) );
}
std::shared_ptr<GlobalDB::RocksDB> GlobalDB::GetDataStore()
{
return m_datastore;
}
outcome::result<GlobalDB::CRDTHeadListResult> GlobalDB::GetCRDTHeadList()
{
return m_crdtDatastore->GetHeadList();
}
outcome::result<uint64_t> GlobalDB::GetCRDTHeadHeight( const CID &aCid, const std::string &topic )
{
return m_crdtDatastore->GetHeadHeight( aCid, topic );
}
outcome::result<void> GlobalDB::CRDTHeadRemove( const CID &aCid, const std::string &topic )
{
return m_crdtDatastore->RemoveHead( aCid, topic );
}
outcome::result<void> GlobalDB::CRDTHeadAdd( const CID &aCid, const std::string &topic, uint64_t priority )
{
return m_crdtDatastore->AddHead( aCid, topic, priority );
}
std::shared_ptr<PubSubBroadcasterExt> GlobalDB::GetBroadcaster()
{
return m_broadcaster;
}
outcome::result<CrdtDatastore::JobStatus> GlobalDB::GetCIDJobStatus( const CID &cid ) const
{
if ( !m_crdtDatastore )
{
return outcome::failure( Error::CRDT_DATASTORE_NOT_CREATED );
}
return m_crdtDatastore->GetJobStatus( cid );
}
outcome::result<void> GlobalDB::RequestHeadBroadcast( const std::set<std::string> &topics )
{
if ( !m_crdtDatastore )
{
m_logger->error( "RequestHeadBroadcast: CRDT datastore not initialized" );
return outcome::failure( Error::CRDT_DATASTORE_NOT_CREATED );
}
if ( !started_.load() )
{
m_logger->error( "RequestHeadBroadcast: GlobalDB not started" );
return outcome::failure( Error::GLOBALDB_NOT_STARTED );
}
m_logger->debug( "RequestHeadBroadcast: Forwarding request for {} topics", topics.size() );
return m_crdtDatastore->BroadcastHeadsForTopics( topics );
}
outcome::result<std::unordered_set<std::string>> GlobalDB::GetMonitoredTopics() const
{
if ( !m_crdtDatastore )
{
m_logger->error( "{}: CRDT datastore not initialized", __func__ );
return outcome::failure( Error::CRDT_DATASTORE_NOT_CREATED );
}
m_logger->debug( "{}: Forwarding request for {} topics", __func__ );
return m_crdtDatastore->GetTopicNames();
}
std::shared_ptr<crdt::CrdtDatastore> GlobalDB::GetCRDTDataStore()
{
return m_crdtDatastore;
}
}