Skip to content

src/crdt/impl/crdt_set.cpp

Namespaces

Name
sgns
sgns::crdt

Source code

#include "crdt/crdt_set.hpp"
#include <storage/database_error.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/system/error_code.hpp>
#include <boost/lexical_cast.hpp>
#include <utility>
#include <fstream>

namespace sgns::crdt
{

    CrdtSet::CrdtSet( std::shared_ptr<DataStore> aDatastore,
                      const HierarchicalKey     &aNamespace,
                      PutHookPtr                 aPutHookPtr,
                      DeleteHookPtr              aDeleteHookPtr ) :
        dataStore_( std::move( aDatastore ) ),
        namespaceKey_( aNamespace ),
        putHookFunc_( std::move( aPutHookPtr ) ),
        deleteHookFunc_( std::move( aDeleteHookPtr ) )
    {
    }

    CrdtSet::CrdtSet( const CrdtSet &aSet )
    {
        *this = aSet;
    }

    bool CrdtSet::operator==( const CrdtSet &aSet ) const
    {
        bool returnEqual  = true;
        returnEqual      &= this->dataStore_ == aSet.dataStore_;
        returnEqual      &= this->namespaceKey_ == aSet.namespaceKey_;
        return returnEqual;
    }

    bool CrdtSet::operator!=( const CrdtSet &aSet ) const
    {
        return !( *this == aSet );
    }

    CrdtSet &CrdtSet::operator=( const CrdtSet &aSet )
    {
        if ( this != &aSet )
        {
            this->dataStore_      = aSet.dataStore_;
            this->namespaceKey_   = aSet.namespaceKey_;
            this->putHookFunc_    = aSet.putHookFunc_;
            this->deleteHookFunc_ = aSet.deleteHookFunc_;
        }
        return *this;
    }

    outcome::result<std::string> CrdtSet::GetValueFromDatastore( const HierarchicalKey &aKey ) const
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        Buffer bufferKey;
        bufferKey.put( aKey.GetKey() );

        auto bufferValueResult = dataStore_->get( bufferKey );
        if ( bufferValueResult.has_failure() )
        {
            return outcome::failure( bufferValueResult.error() );
        }

        std::string strValue( bufferValueResult.value().toString() );
        return strValue;
    }

    outcome::result<std::shared_ptr<CrdtSet::Delta>> CrdtSet::CreateDeltaToAdd( const std::string &aKey,
                                                                                const std::string &aValue )
    {
        auto delta   = std::make_shared<Delta>();
        auto element = delta->add_elements();
        element->set_key( aKey );
        element->set_value( aValue );

        return delta;
    }

    outcome::result<std::shared_ptr<CrdtSet::Delta>> CrdtSet::CreateDeltaToRemove( const std::string &aKey ) const
    {
        auto delta = std::make_shared<Delta>();
        // /namespace/s/<key>
        auto prefix         = this->ElemsPrefix( aKey );
        auto strElemsPrefix = prefix.GetKey();

        Buffer keyPrefixBuffer;
        keyPrefixBuffer.put( strElemsPrefix );
        OUTCOME_TRY( auto queryResult, this->dataStore_->query( keyPrefixBuffer ) );

        for ( const auto &[key, _] : queryResult )
        {
            std::string keyWithPrefix( key.toString() );
            std::string id = keyWithPrefix.erase( 0, strElemsPrefix.size() );

            auto hId = HierarchicalKey( id );

            if ( !hId.IsTopLevel() )
            {
                continue;
            }

            // check if its already tombed, which case don't add it to the
            // Remove delta set.
            auto isDeletedResult = this->InTombsKeyID( aKey, hId.GetKey() );
            if ( isDeletedResult.has_value() && !isDeletedResult.value() )
            {
                auto tombstone = delta->add_tombstones();
                tombstone->set_key( aKey );
                tombstone->set_id( hId.GetKey() );
            }
        }

        return delta;
    }

    outcome::result<CrdtSet::Buffer> CrdtSet::GetElement( const std::string &aKey ) const
    {
        // We can only GET an element if it's part of the Set (in
        // "elements" and not in "tombstones").

        // As an optimization:
        // * If the key has a value in the store it means:
        //   -> It occurs at least once in "elems"
        //   -> It may or not be tombstoned
        // * If the key does not have a value in the store:
        //   -> It was either never added

        auto valueK      = this->ValueKey( aKey );
        auto valueResult = this->GetValueFromDatastore( valueK );

        if ( valueResult.has_failure() )
        {
            // not found is fine, we just return it
            return outcome::failure( valueResult.error() );
        }

        // We have an existing element. Check if tombstoned.
        auto inSetResult = this->InElemsNotTombstoned( aKey );
        if ( inSetResult.has_failure() )
        {
            return outcome::failure( inSetResult.error() );
        }

        if ( !inSetResult.value() )
        {
            // attempt to remove so next time we do not have to do this lookup.
            // In concurrency, this may delete a key that was just written
            // and should not be deleted.
            return outcome::failure( boost::system::error_code{} );
        }

        // otherwise return the value
        Buffer bufferValue;
        bufferValue.put( valueResult.value() );

        return bufferValue;
    }

    outcome::result<CrdtSet::QueryResult> CrdtSet::QueryElements(
        std::string_view   aPrefix,
        const QuerySuffix &aSuffix /*=QuerySuffix::QUERY_ALL*/ ) const
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( storage::DatabaseError::UNITIALIZED );
        }

        // We can only GET an element if it's part of the Set (in
        // "elements" and not in "tombstones").

        // As an optimization:
        // * If the key has a value in the store it means:
        //   -> It occurs at least once in "elems"
        //   -> It may or not be tombstoned
        // * If the key does not have a value in the store:
        //   -> It was either never added

        // /namespace/k/<prefix>
        auto prefixKeysKey = this->KeysKey( aPrefix );

        Buffer keyPrefixBuffer;
        keyPrefixBuffer.put( prefixKeysKey.GetKey() );
        auto queryResult = this->dataStore_->query( keyPrefixBuffer );
        if ( queryResult.has_failure() )
        {
            return outcome::failure( queryResult.error() );
        }

        QueryResult elements;
        // Check if elements tombstoned.
        for ( const auto &element : queryResult.value() )
        {
            auto inSetResult = this->InElemsNotTombstoned( std::string( element.first.toString() ) );
            if ( inSetResult.has_failure() || !inSetResult.value() )
            {
                continue;
            }

            std::string key( element.first.toString() );
            switch ( aSuffix )
            {
                case QuerySuffix::QUERY_ALL:
                    elements.insert( element );
                    break;
                case QuerySuffix::QUERY_PRIORITYSUFFIX:
                    if ( boost::algorithm::ends_with( key, "/" + GetPrioritySuffix() ) )
                    {
                        elements.insert( element );
                    }
                    break;
                case QuerySuffix::QUERY_VALUESUFFIX:
                    if ( boost::algorithm::ends_with( key, "/" + GetValueSuffix() ) )
                    {
                        elements.insert( element );
                    }
                    break;
                default:
                    return outcome::failure( queryResult.error() );
            }
        }

        return elements;
    }

    outcome::result<CrdtSet::QueryResult> CrdtSet::QueryElements( const std::string &prefix_base,
                                                                  const std::string &middle_part,
                                                                  const std::string &remainder_prefix,
                                                                  const QuerySuffix &aSuffix ) const
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( storage::DatabaseError::UNITIALIZED );
        }

        // We can only GET an element if it's part of the Set (in
        // "elements" and not in "tombstones").

        // As an optimization:
        // * If the key has a value in the store it means:
        //   -> It occurs at least once in "elems"
        //   -> It may or not be tombstoned
        // * If the key does not have a value in the store:
        //   -> It was either never added

        // /namespace/k/<prefix>
        auto prefixKeysKey = this->KeysKey( prefix_base );

        auto queryResult = this->dataStore_->query( prefixKeysKey.GetKey() + "/", middle_part, remainder_prefix );
        if ( queryResult.has_failure() )
        {
            return outcome::failure( queryResult.error() );
        }

        QueryResult elements;
        // Check if elements tombstoned.
        for ( const auto &element : queryResult.value() )
        {
            auto inSetResult = this->InElemsNotTombstoned( std::string( element.first.toString() ) );
            if ( inSetResult.has_failure() || !inSetResult.value() )
            {
                continue;
            }

            std::string key( element.first.toString() );
            switch ( aSuffix )
            {
                case QuerySuffix::QUERY_ALL:
                    elements.insert( element );
                    break;
                case QuerySuffix::QUERY_PRIORITYSUFFIX:
                    if ( boost::algorithm::ends_with( key, "/" + GetPrioritySuffix() ) )
                    {
                        elements.insert( element );
                    }
                    break;
                case QuerySuffix::QUERY_VALUESUFFIX:
                    if ( boost::algorithm::ends_with( key, "/" + GetValueSuffix() ) )
                    {
                        elements.insert( element );
                    }
                    break;
                default:
                    return outcome::failure( queryResult.error() );
            }
        }

        return elements;
    }

    outcome::result<bool> CrdtSet::IsValueInSet( const std::string &aKey ) const
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        // Optimization: if we do not have a value
        // this key was never added.
        auto valueK = this->ValueKey( aKey );

        Buffer bufferKey;
        bufferKey.put( valueK.GetKey() );

        if ( !this->dataStore_->contains( bufferKey ) )
        {
            return false;
        }

        // Otherwise, do the long check.
        auto inElemsNotTombstonedResult = this->InElemsNotTombstoned( aKey );
        if ( inElemsNotTombstonedResult.has_error() )
        {
            return outcome::failure( inElemsNotTombstonedResult.error() );
        }

        return inElemsNotTombstonedResult.value();
    }

    outcome::result<bool> CrdtSet::InElemsNotTombstoned( const std::string &aKey ) const
    {
        // /namespace/elems/<key>
        auto prefix         = this->ElemsPrefix( aKey );
        auto strElemsPrefix = prefix.GetKey();

        Buffer keyPrefixBuffer;
        keyPrefixBuffer.put( strElemsPrefix );
        auto queryResult = this->dataStore_->query( keyPrefixBuffer );
        if ( queryResult.has_failure() )
        {
            return outcome::failure( queryResult.error() );
        }

        if ( queryResult.value().empty() )
        {
            return true;
        }

        for ( const auto &[key, _] : queryResult.value() )
        {
            std::string keyWithPrefix( key.toString() );
            std::string id  = keyWithPrefix.erase( 0, strElemsPrefix.size() );
            auto        hId = HierarchicalKey( id );
            if ( !hId.IsTopLevel() )
            {
                // our prefix matches blocks from other keys i.e. our
                // prefix is "hello" and we have a different key like
                // "hello/bye" so we have a block id like
                // "bye/<block>". If we got the right key, then the id
                // should be the block id only.
                continue;
            }
            // if not tombstoned, we have it
            auto inTombResult = this->InTombsKeyID( aKey, hId.GetKey() );
            if ( inTombResult.has_value() && !inTombResult.value() )
            {
                return true;
            }
        }

        return false;
    }

    HierarchicalKey CrdtSet::KeyPrefix( const std::string &aKey ) const
    {
        // /namespace/<key>
        return this->namespaceKey_.ChildString( aKey );
    }

    HierarchicalKey CrdtSet::ElemsPrefix( const std::string &aKey ) const
    {
        // /namespace/s/<key>
        return this->KeyPrefix( std::string( elemsNamespace_ ) ).ChildString( aKey );
    }

    HierarchicalKey CrdtSet::TombsPrefix( const std::string &aKey ) const
    {
        // /namespace/t/<key>
        return this->KeyPrefix( std::string( tombsNamespace_ ) ).ChildString( aKey );
    }

    HierarchicalKey CrdtSet::KeysKey( std::string_view aKey ) const
    {
        // /namespace/k/<key>
        return this->KeyPrefix( std::string( keysNamespace_ ) ).ChildString( aKey );
    }

    HierarchicalKey CrdtSet::ValueKey( const std::string &aKey ) const
    {
        // /namespace/k/<key>/v
        return this->KeysKey( aKey ).ChildString( GetValueSuffix() );
    }

    HierarchicalKey CrdtSet::PriorityKey( const std::string &aKey ) const
    {
        // /namespace/k/<key>/p
        return this->KeysKey( aKey ).ChildString( GetPrioritySuffix() );
    }

    outcome::result<uint64_t> CrdtSet::GetPriority( const std::string &aKey ) const
    {
        uint64_t priority     = 0;
        auto     priority_key = this->PriorityKey( aKey );
        if ( auto valueResult = this->GetValueFromDatastore( priority_key ); !valueResult.has_failure() )
        {
            try
            {
                priority = boost::lexical_cast<uint64_t>( valueResult.value() ) - 1;
            }
            catch ( boost::bad_lexical_cast & )
            {
                return outcome::failure( boost::system::error_code{} );
            }
        }
        else if ( valueResult.has_failure() && valueResult.error() != storage::DatabaseError::NOT_FOUND )
        {
            // Return failure only we have other than NOT_FOUND error
            return outcome::failure( valueResult.error() );
        }
        return priority;
    }

    outcome::result<void> CrdtSet::SetPriority( const std::string &aKey, uint64_t aPriority )
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto priority_key = this->PriorityKey( aKey );

        std::string strPriority = std::to_string( aPriority + 1 );

        Buffer keyBuffer;
        keyBuffer.put( priority_key.GetKey() );

        Buffer valueBuffer;
        valueBuffer.put( strPriority );

        return this->dataStore_->put( keyBuffer, valueBuffer );
    }

    outcome::result<void> CrdtSet::SetValue( const std::string &aKey,
                                             const std::string &aID,
                                             const Buffer      &aValue,
                                             uint64_t           aPriority )
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto batchDatastore = this->dataStore_->batch();
        auto setValueResult = this->SetValue( batchDatastore, aKey, aID, aValue, aPriority );
        if ( setValueResult.has_failure() )
        {
            return outcome::failure( setValueResult.error() );
        }

        auto commitResult = batchDatastore->commit();
        if ( commitResult.has_failure() )
        {
            return outcome::failure( commitResult.error() );
        }

        return outcome::success();
    }

    outcome::result<void> CrdtSet::SetValue( const std::unique_ptr<storage::BufferBatch> &aDataStore,
                                             const std::string                           &aKey,
                                             const std::string                           &aID,
                                             const Buffer                                &aValue,
                                             uint64_t                                     aPriority )
    {
        if ( aDataStore == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        // If this key was tombstoned already, do not store/update the value at all.
        auto isDeletedResult = this->InTombsKeyID( aKey, aID );
        if ( isDeletedResult.has_failure() )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        if ( isDeletedResult.value() )
        {
            //if it's tombstone we just don't add it
            return outcome::success();
        }

        auto priorityResult = this->GetPriority( aKey );
        if ( priorityResult.has_failure() )
        {
            return outcome::failure( priorityResult.error() );
        }

        if ( aPriority < priorityResult.value() )
        {
            return outcome::success();
        }

        auto valueK = this->ValueKey( aKey );

        if ( aPriority == priorityResult.value() )
        {
            auto valueResult = this->GetValueFromDatastore( valueK );
            if ( valueResult.has_failure() )
            {
                return outcome::failure( valueResult.error() );
            }

            if ( valueResult.value() == std::string( aValue.toString() ) )
            {
                return outcome::success();
            }
        }

        // store value
        Buffer valueKeyBuffer;
        valueKeyBuffer.put( valueK.GetKey() );

        OUTCOME_TRY( aDataStore->put( valueKeyBuffer, aValue ) );

        // store priority
        OUTCOME_TRY( this->SetPriority( aKey, aPriority ) );

        // trigger add hook
        if ( putHookFunc_ != nullptr )
        {
            putHookFunc_( aKey, aValue, aID );
        }

        return outcome::success();
    }

    outcome::result<void> CrdtSet::PutElems( std::vector<Element> &aElems, const std::string &aID, uint64_t aPriority )
    {
        if ( aElems.empty() )
        {
            return outcome::success();
        }

        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        std::lock_guard lg( this->mutex_ );

        auto batchDatastore = this->dataStore_->batch();

        for ( auto &elem : aElems )
        {
            // overwrite the identifier as it would come unset
            elem.set_id( aID );
            auto key = elem.key();

            // /namespace/s/<key>/<id>
            auto kNamespace = this->ElemsPrefix( key ).ChildString( aID );

            Buffer keyBuffer;
            keyBuffer.put( kNamespace.GetKey() );

            OUTCOME_TRY( batchDatastore->put( std::move( keyBuffer ), Buffer() ) );
            // update the value if applicable:
            // * higher priority than we currently have.
            // * not tombstoned before.
            Buffer valueBuffer;
            valueBuffer.put( elem.value() );
            auto setValueResult = this->SetValue( batchDatastore, key, aID, std::move( valueBuffer ), aPriority );
            if ( setValueResult.has_failure() )
            {
                return outcome::failure( setValueResult.error() );
            }
        }
        auto commitResult = batchDatastore->commit();
        if ( commitResult.has_failure() )
        {
            return outcome::failure( commitResult.error() );
        }

        return outcome::success();
    }

    outcome::result<void> CrdtSet::PutTombs( const std::vector<Element> &aTombs, const std::string &aID ) const
    {
        if ( aTombs.empty() )
        {
            return outcome::success();
        }

        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto batchDatastore = this->dataStore_->batch();

        std::vector<std::string> deletedKeys;
        for ( auto tomb : aTombs )
        {
            // /namespace/tombs/<key>/<id>
            if ( tomb.id().empty() )
            {
                tomb.set_id( aID );
            }
            const auto &key        = tomb.key();
            auto        kNamespace = this->TombsPrefix( key ).ChildString( tomb.id() );

            Buffer keyBuffer;
            keyBuffer.put( kNamespace.GetKey() );

            OUTCOME_TRY( batchDatastore->put( std::move( keyBuffer ), Buffer() ) );

            // run delete hook only once for all
            // versions of the same element tombstoned
            // in this delta
            deletedKeys.push_back( key );
        }

        OUTCOME_TRY( batchDatastore->commit() );

        if ( deleteHookFunc_ )
        {
            for ( const auto &key : deletedKeys )
            {
                deleteHookFunc_( key, aID );
            }
        }

        return outcome::success();
    }

    outcome::result<void> CrdtSet::Merge( const Delta &aDelta, const std::string &aID )
    {
        OUTCOME_TRY( this->PutTombs( std::vector( aDelta.tombstones().cbegin(), aDelta.tombstones().cend() ), aID ) );

        std::vector elements( aDelta.elements().cbegin(), aDelta.elements().cend() );
        return this->PutElems( elements, aID, aDelta.priority() );
    }

    outcome::result<bool> CrdtSet::InTombsKeyID( const std::string &aKey, const std::string &aID ) const
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto   kNamespace = this->TombsPrefix( aKey ).ChildString( aID );
        Buffer keyBuffer;
        keyBuffer.put( kNamespace.GetKey() );
        return this->dataStore_->contains( keyBuffer );
    }

    void CrdtSet::SetPutHook( const PutHookPtr &putHookPtr )
    {
        this->putHookFunc_ = putHookPtr;
    }

    void CrdtSet::SetDeleteHook( const DeleteHookPtr &deleteHookPtr )
    {
        this->deleteHookFunc_ = deleteHookPtr;
    }

    outcome::result<void> CrdtSet::DataStoreSync( const std::vector<HierarchicalKey> &aKeyList )
    {
        if ( this->dataStore_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        if ( aKeyList.size() != 4 )
        {
            // Vector hierarchicalkey need enough element.
            return outcome::failure( std::errc::invalid_argument );
        }

        // Put all Hierarchical key to database.
        std::string aKey = aKeyList.at( 0 ).GetKey();
        std::string aID  = aKeyList.at( 1 ).GetKey();
        Buffer      aValue;
        aValue.put( aKeyList.at( 2 ).GetKey() );
        auto aPriority = GetPriority( aKeyList.at( 3 ).GetKey() );
        return SetValue( aKey, aID, aValue, aPriority.value() );
    }

    void CrdtSet::PrintDataStore() const
    {
        if ( dataStore_ )
        {
            std::ofstream logFile( "crdt_data.log", std::ios::out | std::ios::trunc ); // Overwrites the file each time

            if ( !logFile )
            {
                std::cerr << "Failed to open log file for writing!" << std::endl;
                return;
            }

            auto key_values = dataStore_->GetAll();
            for ( const auto &[key, value] : key_values )
            {
                logFile << "[" << key.toString() << "] " << value << std::endl;
            }

            logFile.close();
            std::cout << "Data successfully written to crdt_data.log" << std::endl;
        }
    }

    void CrdtSet::PrintTombs( const std::vector<Element> &aTombs )
    {
        std::cout << "Tombs" << std::endl;
        for ( const auto &tomb : aTombs )
        {
            // /namespace/tombs/<key>/<id>
            std::cout << tomb.key() << ", " << tomb.id() << std::endl;
        }
    }
}

Updated on 2026-03-04 at 13:10:44 -0800