src/crdt/impl/crdt_data_filter.cpp¶
Source file of the CRDT Filter class. More...
Namespaces¶
| Name |
|---|
| sgns |
| sgns::crdt |
Detailed Description¶
Source file of the CRDT Filter class.
Date: 2025-05-12 Henrique A. Klein ([email protected])
Source code¶
#include "crdt/crdt_data_filter.hpp"
#include <set>
namespace sgns::crdt
{
CRDTDataFilter::CRDTDataFilter( bool accept_by_default ) : accept_by_default_( std::move( accept_by_default ) ) {}
bool CRDTDataFilter::RegisterElementFilter( const std::string &pattern, ElementFilterCallback filter )
{
std::lock_guard lock( element_registry_mutex_ );
element_registry_[pattern] = std::move( filter );
return true;
}
bool CRDTDataFilter::RegisterTombstoneFilter( const std::string &pattern, ElementFilterCallback filter )
{
std::lock_guard lock( tombstone_registry_mutex_ );
tombstone_registry_[pattern] = std::move( filter );
return true;
}
void CRDTDataFilter::UnregisterElementFilter( const std::string &pattern )
{
std::lock_guard lock( element_registry_mutex_ );
element_registry_.erase( pattern );
}
void CRDTDataFilter::UnregisterTombstoneFilter( const std::string &pattern )
{
std::lock_guard lock( tombstone_registry_mutex_ );
tombstone_registry_.erase( pattern );
}
void CRDTDataFilter::FilterElementsOnDelta( pb::Delta &delta ) const
{
std::vector<std::string> additional_elements_to_delete;
std::set<int, std::greater<int>> elements_to_delete_indices; // Set with reverse order
std::unordered_map<std::string, ElementFilterCallback> registry_copy;
{
std::shared_lock lock( element_registry_mutex_ );
registry_copy = element_registry_;
}
for ( int i = 0; i < delta.elements_size(); ++i )
{
const auto &element = delta.elements( i );
bool filter_matched = false;
for ( const auto &[pattern, filter] : registry_copy )
{
if ( std::regex regex( pattern ); std::regex_match( element.key(), regex ) )
{
auto result = filter( element );
if ( result.has_value() )
{
// Always delete the matching element when result has value
elements_to_delete_indices.insert( i );
if ( !result->empty() )
{
// Also delete additional elements from the vector
for ( const auto &additional_element : *result )
{
additional_elements_to_delete.push_back( additional_element.key() );
}
}
}
filter_matched = true;
break;
}
}
if ( !filter_matched && !accept_by_default_ )
{
//at least delete the current element
elements_to_delete_indices.insert( i );
}
}
// Second pass: find additional elements to delete
for ( int i = 0; i < delta.elements_size(); ++i )
{
const auto &element = delta.elements( i );
for ( const auto &key_to_delete : additional_elements_to_delete )
{
if ( element.key() == key_to_delete )
{
elements_to_delete_indices.insert( i );
break;
}
}
}
for ( int index : elements_to_delete_indices )
{
delta.mutable_elements()->DeleteSubrange( index, 1 );
}
}
void CRDTDataFilter::FilterTombstonesOnDelta( pb::Delta &delta )
{
//TODO - Figure out how to remove tombstones even recorded ones
throw std::runtime_error( "Not supported" );
}
}
Updated on 2026-03-04 at 13:10:44 -0800