Skip to content

src/crdt/impl/atomic_transaction.cpp

Namespaces

Name
sgns
sgns::crdt

Source code

#include <algorithm>
#include <utility>
#include "crdt/atomic_transaction.hpp"
#include "crdt/crdt_datastore.hpp"

namespace sgns::crdt
{
    AtomicTransaction::AtomicTransaction( std::shared_ptr<CrdtDatastore> datastore ) :
        datastore_( std::move( datastore ) ), is_committed_( false )
    {
    }

    AtomicTransaction::~AtomicTransaction()
    {
        if ( !is_committed_ )
        {
            Rollback();
        }
    }

    outcome::result<void> AtomicTransaction::Put( HierarchicalKey key, Buffer value )
    {
        if ( is_committed_ )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        modified_keys_.insert( key.GetKey() ); // Track the key
        operations_.push_back( { Operation::PUT,  std::move(key), std::move( value ) } );
        return outcome::success();
    }

    outcome::result<void> AtomicTransaction::Remove( const HierarchicalKey &key )
    {
        if ( is_committed_ )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        operations_.push_back( { Operation::REMOVE, key, Buffer() } );
        return outcome::success();
    }

    outcome::result<AtomicTransaction::Buffer> AtomicTransaction::Get( const HierarchicalKey &key ) const
    {
        // First, check pending operations in reverse order (most recent first)
        auto latest_op = FindLatestOperation( key );
        if ( latest_op.has_value() )
        {
            if ( latest_op->type == Operation::REMOVE )
            {
                // Key has been removed in this transaction
                return outcome::failure( boost::system::error_code{} );
            }
            if ( latest_op->type == Operation::PUT )
            {
                // Return the value from the pending put operation
                return latest_op->value;
            }
        }

        // Key not found
        return outcome::failure( boost::system::error_code{} );
    }

    outcome::result<void> AtomicTransaction::Erase( const HierarchicalKey &key )
    {
        if ( is_committed_ )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        // Remove all operations for this key from the operations vector
        auto new_end = std::remove_if( operations_.begin(),
                                       operations_.end(),
                                       [&key]( const PendingOperation &op )
                                       { return op.key.GetKey() == key.GetKey(); } );

        // If we removed any operations, erase them and remove from the set
        if ( new_end != operations_.end() )
        {
            operations_.erase( new_end, operations_.end() );
            modified_keys_.erase( key.GetKey() );
        }

        return outcome::success();
    }

    bool AtomicTransaction::HasKey( const HierarchicalKey &key ) const
    {
        return modified_keys_.find( key.GetKey() ) != modified_keys_.end();
    }

    outcome::result<CID> AtomicTransaction::Commit( const std::unordered_set<std::string> &topics )
    {
        if ( is_committed_ )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        // Create a combined delta for all operations
        auto     combined_delta = std::make_shared<Delta>();
        uint64_t max_priority   = 0;

        for ( const auto &op : operations_ )
        {
            std::shared_ptr<Delta> delta;
            if ( op.type == Operation::PUT )
            {
                OUTCOME_TRY( ( auto &&, result ),
                             datastore_->CreateDeltaToAdd( op.key.GetKey(), std::string( op.value.toString() ) ) );
                delta = result;
            }
            else // REMOVE
            {
                OUTCOME_TRY( ( auto &&, result ), datastore_->CreateDeltaToRemove( op.key.GetKey() ) );
                delta = result;
            }

            for ( const auto &elem : delta->elements() )
            {
                auto new_elem = combined_delta->add_elements();
                new_elem->CopyFrom( elem );
            }
            for ( const auto &tomb : delta->tombstones() )
            {
                auto new_tomb = combined_delta->add_tombstones();
                new_tomb->CopyFrom( tomb );
            }
            max_priority = std::max( max_priority, delta->priority() );
        }
        combined_delta->set_priority( max_priority );

        auto result = datastore_->Publish( combined_delta, topics );
        if ( !result.has_failure() )
        {
            is_committed_ = true;
        }

        return result;
    }

    void AtomicTransaction::Rollback()
    {
        operations_.clear();
        modified_keys_.clear();
    }

    std::optional<AtomicTransaction::PendingOperation> AtomicTransaction::FindLatestOperation(
        const HierarchicalKey &key ) const
    {
        // Search from the end (most recent operations first)
        for ( auto it = operations_.rbegin(); it != operations_.rend(); ++it )
        {
            if ( it->key.GetKey() == key.GetKey() )
            {
                return *it;
            }
        }
        return std::nullopt;
    }
}

Updated on 2026-03-04 at 13:10:44 -0800