Skip to content

src/crdt/impl/crdt_callback_manager.cpp

CRDT callback manager header for when an element gets added/removed. More...

Namespaces

Name
sgns
sgns::crdt

Detailed Description

CRDT callback manager header for when an element gets added/removed.

Date: 2025-09-06 Henrique A. Klein ([email protected])

Source code

#include <regex>
#include "crdt/crdt_callback_manager.hpp"

namespace sgns::crdt
{
    CRDTCallbackManager::CRDTCallbackManager() 
    {

        logger_->debug("CRDTCallbackManager constructed");
    }

    CRDTCallbackManager::~CRDTCallbackManager() 
    {
        logger_->debug("CRDTCallbackManager destroyed");
    }

    bool CRDTCallbackManager::RegisterNewDataCallback( const std::string &pattern, NewDataCallback callback )
    {
        bool            ret = false;
        std::lock_guard lock( new_data_callback_registry_mutex_ );

        logger_->debug("Attempting to register new data callback for pattern: '{}'", pattern);

        if ( new_data_callback_registry_.find( pattern ) == new_data_callback_registry_.end() )
        {
            new_data_callback_registry_[pattern] = std::move( callback );
            ret = true;
            logger_->info("Successfully registered new data callback for pattern: '{}'", pattern);
        }
        else
        {
            logger_->warn("Pattern '{}' already exists in new data callback registry", pattern);
        }

        logger_->debug("Total registered new data callbacks: {}", new_data_callback_registry_.size());
        return ret;
    }

    bool CRDTCallbackManager::RegisterDeletedDataCallback( const std::string &pattern, DeletedDataCallback callback )
    {
        bool            ret = false;
        std::lock_guard lock( deleted_data_callback_registry_mutex_ );

        logger_->debug("Attempting to register deleted data callback for pattern: '{}'", pattern);

        if ( deleted_data_callback_registry_.find( pattern ) == deleted_data_callback_registry_.end() )
        {
            deleted_data_callback_registry_[pattern] = std::move( callback );
            ret = true;
            logger_->info("Successfully registered deleted data callback for pattern: '{}'", pattern);
        }
        else
        {
            logger_->warn("Pattern '{}' already exists in deleted data callback registry", pattern);
        }

        logger_->debug("Total registered deleted data callbacks: {}", deleted_data_callback_registry_.size());
        return ret;
    }

    void CRDTCallbackManager::UnregisterNewDataCallback( const std::string &pattern )
    {
        std::lock_guard lock( new_data_callback_registry_mutex_ );

        auto it = new_data_callback_registry_.find(pattern);
        if (it != new_data_callback_registry_.end())
        {
            new_data_callback_registry_.erase( pattern );
            logger_->info("Successfully unregistered new data callback for pattern: '{}'", pattern);
        }
        else
        {
            logger_->warn("Attempted to unregister non-existent pattern: '{}'", pattern);
        }

        logger_->debug("Total registered new data callbacks after unregister: {}", new_data_callback_registry_.size());
    }

    void CRDTCallbackManager::UnregisterDeletedDataCallback( const std::string &pattern )
    {
        std::lock_guard lock( deleted_data_callback_registry_mutex_ );

        auto it = deleted_data_callback_registry_.find(pattern);
        if (it != deleted_data_callback_registry_.end())
        {
            deleted_data_callback_registry_.erase( pattern );
            logger_->info("Successfully unregistered deleted data callback for pattern: '{}'", pattern);
        }
        else
        {
            logger_->warn("Attempted to unregister non-existent pattern: '{}'", pattern);
        }

        logger_->debug("Total registered deleted data callbacks after unregister: {}", deleted_data_callback_registry_.size());
    }

    void CRDTCallbackManager::PutDataCallback( const std::string  &key,
                                               const base::Buffer &value,
                                               const std::string  &cid )
    {
        logger_->debug("PutDataCallback triggered for key: '{}', cid: '{}', value size: {} bytes", 
                      key, cid, value.size());

        NewDataCallbackRegistry registry_copy;
        {
            std::shared_lock lock( new_data_callback_registry_mutex_ );
            registry_copy = new_data_callback_registry_;
            logger_->debug("Copied {} registered patterns for matching", registry_copy.size());
        }

        if (registry_copy.empty())
        {
            logger_->warn("No new data callbacks registered - key '{}' will not trigger any callbacks", key);
            return;
        }

        bool callback_triggered = false;
        for ( const auto &[pattern, callback] : registry_copy )
        {
            logger_->debug("Testing key '{}' against pattern '{}'", key, pattern);

            try 
            {
                std::regex regex( pattern );
                bool matches = std::regex_match( key, regex );

                logger_->debug("Regex match result for key '{}' vs pattern '{}': {}", 
                              key, pattern, matches ? "MATCH" : "NO MATCH");

                if ( matches )
                {
                    logger_->info("Executing callback for key '{}' matching pattern '{}'", key, pattern);
                    callback( std::make_pair( key, value ), cid );
                    callback_triggered = true;
                }
            }
            catch (const std::regex_error& e)
            {
                logger_->error("Regex error for pattern '{}': {}", pattern, e.what());
            }
        }

        if (!callback_triggered)
        {
            logger_->warn("No callbacks were triggered for key '{}' - no pattern matches found", key);
        }
        else
        {
            logger_->debug("Successfully triggered callbacks for key '{}'", key);
        }
    }

    void CRDTCallbackManager::DeleteDataCallback( const std::string &deleted_key, const std::string &cid )
    {
        logger_->debug("DeleteDataCallback triggered for key: '{}', cid: '{}'", deleted_key, cid);

        DeletedDataCallbackRegistry registry_copy;
        {
            std::shared_lock lock( deleted_data_callback_registry_mutex_ );
            registry_copy = deleted_data_callback_registry_;
            logger_->debug("Copied {} registered delete patterns for matching", registry_copy.size());
        }

        if (registry_copy.empty())
        {
            logger_->warn("No deleted data callbacks registered - key '{}' will not trigger any callbacks", deleted_key);
            return;
        }

        bool callback_triggered = false;
        for ( const auto &[pattern, callback] : registry_copy )
        {
            logger_->debug("Testing deleted key '{}' against pattern '{}'", deleted_key, pattern);

            try 
            {
                std::regex regex( pattern );
                bool matches = std::regex_match( deleted_key, regex );

                logger_->debug("Regex match result for deleted key '{}' vs pattern '{}': {}", 
                              deleted_key, pattern, matches ? "MATCH" : "NO MATCH");

                if ( matches )
                {
                    logger_->info("Executing delete callback for key '{}' matching pattern '{}'", deleted_key, pattern);
                    callback( deleted_key, cid );
                    callback_triggered = true;
                }
            }
            catch (const std::regex_error& e)
            {
                logger_->error("Regex error for delete pattern '{}': {}", pattern, e.what());
            }
        }

        if (!callback_triggered)
        {
            logger_->warn("No delete callbacks were triggered for key '{}' - no pattern matches found", deleted_key);
        }
        else
        {
            logger_->debug("Successfully triggered delete callbacks for key '{}'", deleted_key);
        }
    }

}

Updated on 2026-03-04 at 13:10:44 -0800