src/storage/rocksdb/rocksdb.cpp
Namespaces
Types
Types Documentation
using BlockBasedTableOptions
using sgns::storage::BlockBasedTableOptions = ::ROCKSDB_NAMESPACE::BlockBasedTableOptions;
Source code
#include <memory>
#include <utility>
#include <boost/filesystem.hpp>
#include <rocksdb/table.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/slice_transform.h>
#include <storage/rocksdb/rocksdb.hpp>
#include "storage/database_error.hpp"
#include "storage/rocksdb/rocksdb_cursor.hpp"
#include "storage/rocksdb/rocksdb_batch.hpp"
#include "storage/rocksdb/rocksdb_util.hpp"
namespace sgns::storage
{
using BlockBasedTableOptions = ::ROCKSDB_NAMESPACE::BlockBasedTableOptions;
rocksdb::~rocksdb() {}
outcome::result<std::shared_ptr<rocksdb>> rocksdb::create( std::string_view path, const Options &options )
{
// Create a shared_ptr immediately to avoid manual memory management
auto l = std::make_shared<rocksdb>();
// Store a deep copy of options
l->options_ = std::make_shared<Options>( options );
// Set up bloom filter
BlockBasedTableOptions table_options;
table_options.filter_policy.reset( ::ROCKSDB_NAMESPACE::NewBloomFilterPolicy( 10, false ) );
table_options.whole_key_filtering = true;
l->options_->table_factory.reset( NewBlockBasedTableFactory( table_options ) );
// Define a prefix extractor
l->options_->prefix_extractor.reset( ::ROCKSDB_NAMESPACE::NewCappedPrefixTransform( 3 ) );
l->options_->info_log_level = ::ROCKSDB_NAMESPACE::InfoLogLevel::ERROR_LEVEL;
// Configure threading environment
l->options_->env = ::rocksdb::Env::Default();
l->options_->env->SetBackgroundThreads( 4, ::rocksdb::Env::Priority::HIGH );
// Open the RocksDB database
DB *db = nullptr;
l->path_ = std::string( path );
auto status = DB::Open( *( l->options_ ), l->path_, &db );
if ( status.ok() )
{
// Wrap DB* into a shared_ptr with a custom deleter to ensure cleanup
l->db_ = std::shared_ptr<DB>( db,
[]( DB *ptr )
{
if ( ptr )
{
ptr->Close(); // Properly release RocksDB handles
delete ptr;
}
} );
// Create logger
l->logger_ = base::createLogger( "rocksdb" );
l->wo_.sync = true;
l->setWriteOptions( l->wo_ );
return l; // Return the shared_ptr
}
// Clean up manually allocated DB if Open() succeeded but logic fails
if ( db )
{
delete db;
}
// Return an error result
return error_as_result<std::shared_ptr<rocksdb>>( status );
}
outcome::result<std::shared_ptr<rocksdb>> rocksdb::create( const std::shared_ptr<DB> &db )
{
if ( db == nullptr )
{
return error_as_result<std::shared_ptr<rocksdb>>( rocksdb::Status( rocksdb::Status::PathNotFound() ) );
}
auto l = std::make_unique<rocksdb>();
l->db_ = db;
l->logger_ = base::createLogger( "rocksdb" );
return l;
}
std::unique_ptr<BufferMapCursor> rocksdb::cursor()
{
auto it = std::unique_ptr<Iterator>( db_->NewIterator( ro_ ) );
return std::make_unique<Cursor>( std::move( it ) );
}
std::unique_ptr<BufferBatch> rocksdb::batch()
{
return std::make_unique<Batch>( *this );
}
void rocksdb::setReadOptions( ReadOptions ro )
{
ro_ = std::move( ro );
}
void rocksdb::setWriteOptions( WriteOptions wo )
{
wo_ = wo;
}
outcome::result<Buffer> rocksdb::get( const Buffer &key ) const
{
std::string value;
auto status = db_->Get( ro_, make_slice( key ), &value );
if ( status.ok() )
{
// FIXME: is it possible to avoid copying string -> Buffer?
return Buffer{}.put( value );
}
// not always an actual error so don't log it
if ( status.IsNotFound() )
{
return error_as_result<Buffer>( status );
}
return error_as_result<Buffer>( status, logger_ );
}
outcome::result<rocksdb::QueryResult> rocksdb::query( const Buffer &keyPrefix ) const
{
ReadOptions read_options = ro_;
read_options.auto_prefix_mode = true; //Adaptive Prefix Mode
QueryResult results;
auto iter = std::unique_ptr<rocksdb::Iterator>( db_->NewIterator( read_options ) );
auto slicePrefix = make_slice( keyPrefix );
for ( iter->Seek( slicePrefix ); iter->Valid() && iter->key().starts_with( slicePrefix ); iter->Next() )
{
Buffer key;
key.put( iter->key().ToString() );
Buffer value;
value.put( iter->value().ToString() );
results.emplace( std::move( key ), std::move( value ) );
}
if ( !iter->status().ok() )
{
return error_from_rocksdb( iter->status().code() );
}
return results;
}
outcome::result<rocksdb::QueryResult> rocksdb::query( const std::string &prefix_base,
const std::string &middle_part,
const std::string &remainder_prefix ) const
{
ReadOptions read_options = ro_;
read_options.auto_prefix_mode = true; //Adaptive Prefix Mode
bool negated_query = !middle_part.empty() && middle_part[0] == '!';
bool simplified_query = middle_part != "*" && !negated_query;
auto strKeyPrefix = prefix_base;
if ( simplified_query )
{
strKeyPrefix += middle_part + remainder_prefix;
}
QueryResult results;
auto iter = std::unique_ptr<rocksdb::Iterator>( db_->NewIterator( read_options ) );
for ( iter->Seek( strKeyPrefix ); iter->Valid() && iter->key().starts_with( strKeyPrefix ); iter->Next() )
{
const std::string &key_string = iter->key().ToString();
if ( !simplified_query )
{
size_t pos = key_string.find( remainder_prefix, strKeyPrefix.size() );
if ( pos == std::string::npos )
{
continue;
}
if ( negated_query )
{
size_t pos2 = key_string.find( middle_part.substr( 1 ), strKeyPrefix.size() );
if ( pos2 != std::string::npos )
{
continue;
}
}
}
Buffer key;
key.put( key_string );
Buffer value;
value.put( iter->value().ToString() );
results.emplace( std::move( key ), std::move( value ) );
}
if ( !iter->status().ok() )
{
return error_from_rocksdb( iter->status().code() );
}
return results;
}
bool rocksdb::contains( const Buffer &key ) const
{
// here we interpret all kinds of errors as "not found".
// is there a better way?
return get( key ).has_value();
}
bool rocksdb::empty() const
{
auto it = std::unique_ptr<Iterator>( db_->NewIterator( ro_ ) );
it->SeekToFirst();
return !it->Valid();
}
outcome::result<void> rocksdb::put( const Buffer &key, const Buffer &value )
{
auto status = db_->Put( wo_, make_slice( key ), make_slice( value ) );
if ( status.ok() )
{
return outcome::success();
}
return error_as_result<void>( status, logger_ );
}
outcome::result<void> rocksdb::put( const Buffer &key, Buffer &&value )
{
Buffer copy( std::move( value ) );
return put( key, copy );
}
outcome::result<void> rocksdb::remove( const Buffer &key )
{
auto status = db_->Delete( wo_, make_slice( key ) );
if ( status.ok() )
{
return outcome::success();
}
return error_as_result<void>( status, logger_ );
}
std::vector<rocksdb::KeyValuePair> rocksdb::GetAll() const
{
std::vector<KeyValuePair> ret_val;
auto iter = std::unique_ptr<rocksdb::Iterator>( db_->NewIterator( rocksdb::ReadOptions() ) );
for ( iter->SeekToFirst(); iter->Valid(); iter->Next() )
{
Buffer key;
Buffer value;
key.put( iter->key().ToString() );
value.put( iter->value().ToString() );
ret_val.push_back( std::make_pair( key, value ) );
}
return ret_val;
}
} // namespace sgns::storage
Updated on 2026-03-04 at 13:10:45 -0800