Skip to content

src/crdt/impl/crdt_heads.cpp

Namespaces

Name
sgns
sgns::crdt

Source code

#include "crdt/crdt_heads.hpp"
#include <storage/database_error.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/system/error_code.hpp>
#include <boost/lexical_cast.hpp>
#include <utility>

namespace sgns::crdt
{
    CrdtHeads::CrdtHeads( std::shared_ptr<DataStore> aDatastore, const HierarchicalKey &aNamespace ) :
        dataStore_( std::move( aDatastore ) ), namespaceKey_( aNamespace )
    {
        logger_->debug( "Creating heads" );
        auto result = this->PrimeCache();
    }

    CrdtHeads::CrdtHeads( const CrdtHeads &aHeads )
    {
        *this = aHeads;
    }

    CrdtHeads &CrdtHeads::operator=( const CrdtHeads &aHeads )
    {
        if ( this != &aHeads )
        {
            this->dataStore_    = aHeads.dataStore_;
            this->namespaceKey_ = aHeads.namespaceKey_;
            this->cache_        = aHeads.cache_;
        }
        return *this;
    }

    bool CrdtHeads::operator==( const CrdtHeads &aHeads ) const
    {
        bool returnEqual  = true;
        returnEqual      &= this->dataStore_ == aHeads.dataStore_;
        returnEqual      &= this->namespaceKey_ == aHeads.namespaceKey_;
        returnEqual      &= this->cache_ == aHeads.cache_;
        return returnEqual;
    }

    bool CrdtHeads::operator!=( const CrdtHeads &aHeads ) const
    {
        return !( *this == aHeads );
    }

    HierarchicalKey CrdtHeads::GetNamespaceKey() const
    {
        return this->namespaceKey_;
    }

    outcome::result<HierarchicalKey> CrdtHeads::GetKey( const std::string &topic, const CID &aCid ) const
    {
        // /<namespace>/<topic>/<cid>
        auto topicNs = namespaceKey_.ChildString( std::string( topic ) );

        auto cidStr = aCid.toString();
        if ( cidStr.has_failure() )
        {
            return outcome::failure( cidStr.error() );
        }

        return topicNs.ChildString( cidStr.value() );
    }

    outcome::result<void> CrdtHeads::Write( storage::BufferBatch &aDataStore,
                                            const CID            &aCid,
                                            uint64_t              aHeight,
                                            const std::string    &topic ) const
    {
        auto getKeyResult = GetKey( topic, aCid );
        if ( getKeyResult.has_failure() )
        {
            return outcome::failure( getKeyResult.error() );
        }

        auto strHeight = std::to_string( aHeight );

        Buffer keyBuffer;
        keyBuffer.put( getKeyResult.value().GetKey() );

        Buffer valueBuffer;
        valueBuffer.put( strHeight );

        return aDataStore.put( keyBuffer, valueBuffer );
    }

    outcome::result<void> CrdtHeads::Delete( const std::unique_ptr<storage::BufferBatch> &aDataStore,
                                             const CID                                   &aCid,
                                             const std::string                           &topic ) const
    {
        if ( aDataStore == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto getKeyResult = this->GetKey( topic, aCid );
        if ( getKeyResult.has_failure() )
        {
            return outcome::failure( getKeyResult.error() );
        }

        Buffer keyBuffer;
        keyBuffer.put( getKeyResult.value().GetKey() );
        return aDataStore->remove( keyBuffer );
    }

    outcome::result<void> CrdtHeads::Remove( const CID &aCid, const std::string &topic )
    {
        logger_->debug( "{}: Removing {} as head for topic {}", __func__, aCid.toString().value(), topic );
        OUTCOME_TRY( auto &&head_key, GetKey( topic, aCid ) );
        Buffer keyBuffer;
        keyBuffer.put( head_key.GetKey() );
        OUTCOME_TRY( dataStore_->remove( keyBuffer ) );

        logger_->debug( "{}: Removed {} as head for topic {}", __func__, aCid.toString().value(), topic );

        std::unique_lock lock( mutex_ );
        this->cache_[topic].erase( aCid );

        return outcome::success();
    }

    bool CrdtHeads::IsHead( const CID &aCid, const std::string &topic ) const
    {
        std::shared_lock lock( mutex_ );
        if ( topic.empty() )
        {
            for ( const auto &[_, map] : cache_ )
            {
                if ( map.find( aCid ) != map.end() )
                {
                    return true;
                }
            }
            return false;
        }

        const auto topicIt = cache_.find( topic );
        if ( topicIt == cache_.end() )
        {
            return false;
        }

        return topicIt->second.find( aCid ) != topicIt->second.end();
    }

    outcome::result<uint64_t> CrdtHeads::GetHeadHeight( const CID &aCid, const std::string &topic ) const
    {
        if ( !this->IsHead( aCid, topic ) )
        {
            return 0;
        }
        std::shared_lock lock( mutex_ );
        if ( topic.empty() )
        {
            for ( auto &[_, cid_map] : cache_ )
            {
                if ( auto it = cid_map.find( aCid ); it != cid_map.end() )
                {
                    return it->second;
                }
            }
            return 0u;
        }
        auto tit = cache_.find( topic );
        if ( tit == cache_.end() )
        {
            return 0u;
        }
        auto it = tit->second.find( aCid );
        return it == tit->second.end() ? 0u : it->second;
    }

    outcome::result<size_t> CrdtHeads::GetLength( const std::string &topic ) const
    {
        std::shared_lock lock( mutex_ );
        if ( topic.empty() )
        {
            size_t total = 0;
            for ( const auto &[_, cid_map] : cache_ )
            {
                total += cid_map.size();
            }
            return total;
        }
        return cache_.at( topic ).size();
    }

    outcome::result<void> CrdtHeads::Add( const CID &aCid, uint64_t aHeight, const std::string &topic )
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto batchDatastore = this->dataStore_->batch();
        auto writeResult    = this->Write( *batchDatastore, aCid, aHeight, topic );
        if ( writeResult.has_failure() )
        {
            return outcome::failure( writeResult.error() );
        }

        auto commitResult = batchDatastore->commit();
        if ( commitResult.has_failure() )
        {
            return outcome::failure( commitResult.error() );
        }

        logger_->debug( "Add: Inserting {} with topic {} as head", aCid.toString().value(), topic );

        std::unique_lock lock( mutex_ );
        this->cache_[topic][aCid] = aHeight;

        return outcome::success();
    }

    outcome::result<void> CrdtHeads::Replace( const CID         &aCidHead,
                                              const CID         &aNewHeadCid,
                                              uint64_t           aHeight,
                                              const std::string &topic )
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto batchDatastore = this->dataStore_->batch();
        auto writeResult    = this->Write( *batchDatastore, aNewHeadCid, aHeight, topic );
        if ( writeResult.has_failure() )
        {
            return outcome::failure( writeResult.error() );
        }

        auto deleteResult = Delete( batchDatastore, aCidHead, topic );
        if ( deleteResult.has_failure() )
        {
            return outcome::failure( deleteResult.error() );
        }

        auto commitResult = batchDatastore->commit();
        if ( commitResult.has_failure() )
        {
            return outcome::failure( commitResult.error() );
        }

        logger_->debug( "Replace: Replacing {} with {} as head for topic {}",
                        aCidHead.toString().value(),
                        aNewHeadCid.toString().value(),
                        topic );

        std::unique_lock lock( mutex_ );
        cache_[topic].erase( aCidHead );
        cache_[topic][aNewHeadCid] = aHeight;

        return outcome::success();
    }

    outcome::result<CrdtHeads::CRDTListResult> CrdtHeads::GetList( const std::unordered_set<std::string> &topics ) const
    {
        CRDTHeadList result_heads;
        uint64_t     max_value = 0;
        logger_->trace( "GetList: Getting list of CIDs" );
        std::shared_lock lock( mutex_ );
        for ( const auto &[current_topic, cid_map] : cache_ )
        {
            if ( !topics.empty() && topics.find( current_topic ) == topics.end() )
            {
                continue;
            }

            for ( const auto &[cid, value] : cid_map )
            {
                if ( logger_->level() <= spdlog::level::trace )
                {
                    logger_->trace( "GetList: Returning CID: {}", cid.toString().value() );
                }
                result_heads[current_topic].insert( cid );
                max_value = std::max( max_value, value );
            }
        }

        return outcome::success( CRDTListResult{ result_heads, max_value } );
    }

    outcome::result<void> CrdtHeads::PrimeCache()
    {
        // builds the heads cache based on what's in storage
        const auto strNamespace = this->namespaceKey_.GetKey();
        logger_->debug( "PrimeCache: starting for namespace '{}'", strNamespace );

        Buffer keyPrefixBuffer;
        keyPrefixBuffer.put( strNamespace );

        auto queryResult = this->dataStore_->query( keyPrefixBuffer );
        if ( queryResult.has_failure() )
        {
            logger_->error( "PrimeCache: query failed: {}", queryResult.error().message() );
            return outcome::failure( queryResult.error() );
        }
        logger_->debug( "PrimeCache: retrieved {} entries from datastore", queryResult.value().size() );

        size_t loadedCount = 0;
        for ( const auto &bufferKeyAndValue : queryResult.value() )
        {
            // full key is "/<namespace>/<topic>/<cid>"
            auto        keyView = bufferKeyAndValue.first.toString();
            std::string full( keyView.data(), keyView.size() );

            if ( full.size() <= strNamespace.size() + 1 )
            {
                logger_->debug( "PrimeCache: skipping too-short key '{}'", full );
                continue;
            }

            std::string rel = full.substr( strNamespace.size() + 1 );

            auto sepPos = rel.find( '/' );
            if ( sepPos == std::string::npos )
            {
                logger_->warn( "PrimeCache: malformed key '{}'", rel );
                continue;
            }

            std::string topic  = rel.substr( 0, sepPos );
            std::string strCid = rel.substr( sepPos + 1 );

            if ( topic.empty() || strCid.empty() )
            {
                logger_->warn( "PrimeCache: empty topic or CID in '{}'", rel );
                continue;
            }

            auto cidResult = CID::fromString( strCid );
            if ( cidResult.has_failure() )
            {
                logger_->warn( "PrimeCache: invalid CID '{}' in key '{}'", strCid, full );
                continue;
            }
            CID cid( cidResult.value() );

            uint64_t height = 0;
            try
            {
                height = boost::lexical_cast<uint64_t>( bufferKeyAndValue.second.toString() );
            }
            catch ( const boost::bad_lexical_cast & )
            {
                logger_->warn( "PrimeCache: could not parse height '{}' for CID '{}'",
                               bufferKeyAndValue.second.toString(),
                               strCid );
                continue;
            }

            this->cache_[topic][cid] = height;
            loadedCount++;
            logger_->trace( "PrimeCache: loaded head [topic='{}', cid='{}', height={}]",
                            topic,
                            cid.toString().value(),
                            height );
        }

        logger_->debug( "PrimeCache: completed, loaded {} entries into cache", loadedCount );
        return outcome::success();
    }

}

Updated on 2026-03-04 at 13:10:44 -0800