Skip to content

impl/crdt_callback_manager.cpp

CRDT callback manager header for when an element gets added/removed. More...

Namespaces

Name
sgns
sgns::crdt

Detailed Description

CRDT callback manager header for when an element gets added/removed.

Date: 2025-09-06 Henrique A. Klein ([email protected])

Source code

#include <regex>
#include "crdt/crdt_callback_manager.hpp"
#include "crdt/globaldb/crdt_work_journal.hpp"

namespace sgns::crdt
{
    CRDTCallbackManager::CRDTCallbackManager( std::shared_ptr<CRDTWorkJournal> work_journal ) :
        work_journal_( std::move( work_journal ) )
    {
        logger_->debug( "CRDTCallbackManager constructed" );
    }

    CRDTCallbackManager::~CRDTCallbackManager()
    {
        logger_->debug( "CRDTCallbackManager destroyed" );
    }

    bool CRDTCallbackManager::RegisterNewDataCallback( const std::string &pattern, NewDataCallback callback )
    {
        bool            ret = false;
        std::lock_guard lock( new_data_callback_registry_mutex_ );

        logger_->debug( "Attempting to register new data callback for pattern: '{}'", pattern );

        if ( new_data_callback_registry_.find( pattern ) == new_data_callback_registry_.end() )
        {
            new_data_callback_registry_[pattern] = std::move( callback );
            ret                                  = true;
            logger_->info( "Successfully registered new data callback for pattern: '{}'", pattern );
        }
        else
        {
            logger_->warn( "Pattern '{}' already exists in new data callback registry", pattern );
        }

        logger_->debug( "Total registered new data callbacks: {}", new_data_callback_registry_.size() );
        return ret;
    }

    bool CRDTCallbackManager::RegisterDeletedDataCallback( const std::string &pattern, DeletedDataCallback callback )
    {
        bool            ret = false;
        std::lock_guard lock( deleted_data_callback_registry_mutex_ );

        logger_->debug( "Attempting to register deleted data callback for pattern: '{}'", pattern );

        if ( deleted_data_callback_registry_.find( pattern ) == deleted_data_callback_registry_.end() )
        {
            deleted_data_callback_registry_[pattern] = std::move( callback );
            ret                                      = true;
            logger_->info( "Successfully registered deleted data callback for pattern: '{}'", pattern );
        }
        else
        {
            logger_->warn( "Pattern '{}' already exists in deleted data callback registry", pattern );
        }

        logger_->debug( "Total registered deleted data callbacks: {}", deleted_data_callback_registry_.size() );
        return ret;
    }

    void CRDTCallbackManager::UnregisterNewDataCallback( const std::string &pattern )
    {
        std::lock_guard lock( new_data_callback_registry_mutex_ );

        auto it = new_data_callback_registry_.find( pattern );
        if ( it != new_data_callback_registry_.end() )
        {
            new_data_callback_registry_.erase( pattern );
            logger_->info( "Successfully unregistered new data callback for pattern: '{}'", pattern );
        }
        else
        {
            logger_->warn( "Attempted to unregister non-existent pattern: '{}'", pattern );
        }

        logger_->debug( "Total registered new data callbacks after unregister: {}",
                        new_data_callback_registry_.size() );
    }

    void CRDTCallbackManager::UnregisterDeletedDataCallback( const std::string &pattern )
    {
        std::lock_guard lock( deleted_data_callback_registry_mutex_ );

        auto it = deleted_data_callback_registry_.find( pattern );
        if ( it != deleted_data_callback_registry_.end() )
        {
            deleted_data_callback_registry_.erase( pattern );
            logger_->info( "Successfully unregistered deleted data callback for pattern: '{}'", pattern );
        }
        else
        {
            logger_->warn( "Attempted to unregister non-existent pattern: '{}'", pattern );
        }

        logger_->debug( "Total registered deleted data callbacks after unregister: {}",
                        deleted_data_callback_registry_.size() );
    }

    void CRDTCallbackManager::PutDataCallback( const std::string  &key,
                                               const base::Buffer &value,
                                               const std::string  &cid )
    {
        logger_->debug( "PutDataCallback triggered for key: '{}', cid: '{}', value size: {} bytes",
                        key,
                        cid,
                        value.size() );

        NewDataCallbackRegistry registry_copy;
        {
            std::shared_lock lock( new_data_callback_registry_mutex_ );
            registry_copy = new_data_callback_registry_;
            logger_->debug( "Copied {} registered patterns for matching", registry_copy.size() );
        }
        work_journal_->MarkProcessing( key );

        if ( registry_copy.empty() )
        {
            logger_->warn( "No new data callbacks registered - key '{}' will not trigger any callbacks", key );
            return;
        }

        bool callback_triggered = false;
        for ( const auto &[pattern, callback] : registry_copy )
        {
            logger_->debug( "Testing key '{}' against pattern '{}'", key, pattern );

            try
            {
                std::regex regex( pattern );
                bool       matches = std::regex_match( key, regex );

                logger_->debug( "Regex match result for key '{}' vs pattern '{}': {}",
                                key,
                                pattern,
                                matches ? "MATCH" : "NO MATCH" );

                if ( matches )
                {
                    logger_->info( "Executing callback for key '{}' matching pattern '{}'", key, pattern );
                    callback( std::make_pair( key, value ), cid );
                    if ( auto entry = work_journal_->GetEntry( key );
                         entry.has_value() && entry->state == CRDTWorkJournal::State::Processing )
                    {
                        if ( !work_journal_->MarkDone( key ) )
                        {
                            logger_->error( "Failed to auto-complete CRDT work for key '{}'", key );
                        }
                    }
                    callback_triggered = true;
                }
            }
            catch ( const std::regex_error &e )
            {
                logger_->error( "Regex error for pattern '{}': {}", pattern, e.what() );
            }
        }

        if ( !callback_triggered )
        {
            logger_->warn( "No callbacks were triggered for key '{}' - no pattern matches found", key );
        }
        else
        {
            logger_->debug( "Successfully triggered callbacks for key '{}'", key );
        }
    }

    void CRDTCallbackManager::DeleteDataCallback( const std::string &deleted_key, const std::string &cid )
    {
        logger_->debug( "DeleteDataCallback triggered for key: '{}', cid: '{}'", deleted_key, cid );

        DeletedDataCallbackRegistry registry_copy;
        {
            std::shared_lock lock( deleted_data_callback_registry_mutex_ );
            registry_copy = deleted_data_callback_registry_;
            logger_->debug( "Copied {} registered delete patterns for matching", registry_copy.size() );
        }

        if ( registry_copy.empty() )
        {
            logger_->warn( "No deleted data callbacks registered - key '{}' will not trigger any callbacks",
                           deleted_key );
            return;
        }

        bool callback_triggered = false;
        for ( const auto &[pattern, callback] : registry_copy )
        {
            logger_->debug( "Testing deleted key '{}' against pattern '{}'", deleted_key, pattern );

            try
            {
                std::regex regex( pattern );
                bool       matches = std::regex_match( deleted_key, regex );

                logger_->debug( "Regex match result for deleted key '{}' vs pattern '{}': {}",
                                deleted_key,
                                pattern,
                                matches ? "MATCH" : "NO MATCH" );

                if ( matches )
                {
                    logger_->info( "Executing delete callback for key '{}' matching pattern '{}'",
                                   deleted_key,
                                   pattern );
                    callback( deleted_key, cid );
                    callback_triggered = true;
                }
            }
            catch ( const std::regex_error &e )
            {
                logger_->error( "Regex error for delete pattern '{}': {}", pattern, e.what() );
            }
        }

        if ( !callback_triggered )
        {
            logger_->warn( "No delete callbacks were triggered for key '{}' - no pattern matches found", deleted_key );
        }
        else
        {
            logger_->debug( "Successfully triggered delete callbacks for key '{}'", deleted_key );
        }
    }

}

Updated on 2026-06-05 at 17:22:19 -0700