blockchain/ValidatorRegistry.cpp¶
Validator registry and quorum logic for governance. More...
Namespaces¶
| Name |
|---|
| sgns |
Detailed Description¶
Validator registry and quorum logic for governance.
Date: 2025-10-16 Henrique A. Klein ([email protected])
Source code¶
#include "blockchain/ValidatorRegistry.hpp"
#include <algorithm>
#include <atomic>
#include <chrono>
#include <limits>
#include <set>
#include <system_error>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <gsl/span>
#include "account/GeniusAccount.hpp"
#include "base/hexutil.hpp"
#include "blockchain/Consensus.hpp"
#include "blockchain/ConsensusAuth.hpp"
#include "blockchain/impl/proto/ValidatorRegistry.pb.h"
#include "crypto/hasher/hasher_impl.hpp"
#include "crdt/graphsync_dagsyncer.hpp"
namespace sgns
{
namespace
{
base::Logger ValidatorRegistryLogger()
{
return base::createLogger( "ValidatorRegistry" );
}
outcome::result<std::string> ExtractPrevRegistryCid( const ipfs_lite::ipld::IPLDNode &node )
{
auto buffer = node.content();
crdt::pb::Delta delta;
if ( !delta.ParseFromArray( buffer.data(), buffer.size() ) )
{
ValidatorRegistryLogger()->error( "{}: Failed to parse Delta from IPLD node", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
const std::string registry_key = std::string( ValidatorRegistry::RegistryKey() );
for ( const auto &element : delta.elements() )
{
validator::RegistryUpdate update;
if ( !update.ParseFromString( element.value() ) )
{
ValidatorRegistryLogger()->error( "{}: Can't parse the registry update {}",
__func__,
element.key() );
return outcome::failure( std::errc::invalid_argument );
}
return update.prev_registry_hash();
}
ValidatorRegistryLogger()->error( "{}: NO SUCH FILE ", __func__ );
return outcome::failure( std::errc::no_such_file_or_directory );
}
outcome::result<std::string> ExtractConsensusSubjectHash( const ConsensusSubject &subject )
{
auto nonce_payload = ConsensusManager::DecodeNonceSubject( subject );
if ( nonce_payload.has_value() )
{
if ( nonce_payload.value().tx_hash().empty() )
{
return outcome::failure( std::errc::invalid_argument );
}
return nonce_payload.value().tx_hash();
}
auto task_payload = ConsensusManager::DecodeTaskResultSubject( subject );
if ( task_payload.has_value() )
{
if ( task_payload.value().task_result_hash().empty() )
{
return outcome::failure( std::errc::invalid_argument );
}
return task_payload.value().task_result_hash();
}
auto batch_payload = ConsensusManager::DecodeRegistryBatchSubject( subject );
if ( batch_payload.has_value() )
{
if ( batch_payload.value().batch_root().empty() )
{
return outcome::failure( std::errc::invalid_argument );
}
return std::string( batch_payload.value().batch_root() );
}
return outcome::failure( std::errc::invalid_argument );
}
}
ValidatorRegistry::ValidatorRegistry( std::shared_ptr<crdt::GlobalDB> db,
uint64_t quorum_numerator,
uint64_t quorum_denominator,
WeightConfig weight_config,
std::string genesis_authority,
BlockRequestMethod block_request_method,
InitCallback init_callback ) :
db_( std::move( db ) ),
quorum_numerator_( quorum_numerator ),
quorum_denominator_( quorum_denominator ),
weight_config_( std::move( weight_config ) ),
genesis_authority_( std::move( genesis_authority ) ),
init_callback_( std::move( init_callback ) ),
request_block_by_cid_( std::move( block_request_method ) )
{
logger_->trace( "{}: constructed", __func__ );
}
ValidatorRegistry::~ValidatorRegistry()
{
const std::string pattern = "/?" + std::string( RegistryKey() );
if ( db_ )
{
db_->UnregisterNewElementCallback( pattern );
db_->UnregisterElementFilter( pattern );
}
logger_->trace( "{}: destroyed", __func__ );
}
std::shared_ptr<ValidatorRegistry> ValidatorRegistry::New( std::shared_ptr<crdt::GlobalDB> db,
uint64_t quorum_numerator,
uint64_t quorum_denominator,
WeightConfig weight_config,
std::string genesis_authority,
BlockRequestMethod block_request_method,
InitCallback init_callback )
{
if ( !db )
{
return nullptr;
}
if ( genesis_authority.empty() )
{
return nullptr;
}
if ( block_request_method == nullptr )
{
return nullptr;
}
if ( quorum_denominator == 0 )
{
quorum_denominator = 1;
}
auto instance = std::shared_ptr<ValidatorRegistry>( new ValidatorRegistry( std::move( db ),
quorum_numerator,
quorum_denominator,
std::move( weight_config ),
std::move( genesis_authority ),
std::move( block_request_method ),
std::move( init_callback ) ) );
instance->logger_->trace( "{}: instance created", __func__ );
instance->InitializeCache();
if ( !instance->RegisterFilter() )
{
instance->logger_->error( "{}: failed to register filters", __func__ );
return nullptr;
}
instance->logger_->info( "{}: instance ready", __func__ );
return instance;
}
outcome::result<void> ValidatorRegistry::MigrateCids( const std::shared_ptr<crdt::GlobalDB> &old_db,
const std::shared_ptr<crdt::GlobalDB> &new_db )
{
if ( !old_db || !new_db )
{
return outcome::failure( std::errc::invalid_argument );
}
auto old_syncer = std::static_pointer_cast<crdt::GraphsyncDAGSyncer>(
old_db->GetBroadcaster()->GetDagSyncer() );
auto new_crdt = new_db->GetCRDTDataStore();
if ( !new_crdt )
{
ValidatorRegistryLogger()->error( "{}: Missing broadcaster while migrating Validator CIDs", __func__ );
return outcome::failure( std::errc::no_such_device );
}
if ( !old_syncer )
{
ValidatorRegistryLogger()->error( "{}: Missing DAG syncer while migrating Validator CIDs", __func__ );
return outcome::failure( std::errc::no_such_device );
}
auto old_store = old_db->GetDataStore();
auto new_store = new_db->GetDataStore();
ValidatorRegistryLogger()->debug( "{}: Getting the registry CID from the datastore", __func__ );
crdt::GlobalDB::Buffer registry_cid_key;
registry_cid_key.put( std::string( RegistryCidKey() ) );
auto registry_cid = old_store->get( registry_cid_key );
if ( registry_cid.has_value() )
{
ValidatorRegistryLogger()->debug( "{}: Latest Validator CID: {}",
__func__,
registry_cid.value().toString() );
std::vector<std::string> registry_chain;
std::vector<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> nodes;
auto current_cid = std::string( registry_cid.value().toString() );
while ( !current_cid.empty() )
{
registry_chain.push_back( current_cid );
BOOST_OUTCOME_TRY( auto cid, CID::fromString( current_cid ) );
BOOST_OUTCOME_TRY( auto node, old_syncer->GetNodeFromMerkleDAG( cid ) );
auto prev_result = ExtractPrevRegistryCid( *node );
nodes.push_back( std::move( node ) );
if ( prev_result.has_error() )
{
ValidatorRegistryLogger()->error( "{}: Failed to extract previous registry CID from {}",
__func__,
current_cid );
break;
}
current_cid = prev_result.value();
}
for ( size_t i = registry_chain.size(); i-- > 0; )
{
const auto &cid_string = registry_chain[i];
const auto &node = nodes[i];
if ( cid_string.empty() )
{
continue;
}
ValidatorRegistryLogger()->debug( "{}: Adding Validator CID: {}",
__func__,
registry_cid.value().toString() );
crdt::GlobalDB::Buffer registry_cid_value;
registry_cid_value.put( cid_string );
(void)new_store->put( registry_cid_key, std::move( registry_cid_value ) );
BOOST_OUTCOME_TRY( new_crdt->AddDAGNode( node ) );
}
}
ValidatorRegistryLogger()->debug( "{}: Finished migrating validator registry: ", __func__ );
return outcome::success();
}
uint64_t ValidatorRegistry::ComputeWeight( Role role ) const
{
logger_->trace( "{}: entry role={}", __func__, static_cast<int>( role ) );
uint64_t weight = weight_config_.regular_weight_;
uint64_t cap = weight_config_.regular_max_weight_;
switch ( role )
{
case Role::GENESIS:
weight = weight_config_.genesis_weight_;
cap = weight_config_.genesis_max_weight_;
break;
case Role::FULL:
weight = weight_config_.full_weight_;
cap = weight_config_.full_max_weight_;
break;
case Role::SHARDED:
weight = weight_config_.sharded_weight_;
cap = weight_config_.sharded_max_weight_;
break;
case Role::REGULAR:
default:
break;
}
if ( weight == 0 )
{
logger_->debug( "{}: weight is zero", __func__ );
return 0;
}
if ( weight > cap )
{
logger_->debug( "{}: weight clamped to max {}", __func__, cap );
return cap;
}
logger_->debug( "{}: computed weight={}", __func__, weight );
return weight;
}
uint64_t ValidatorRegistry::TotalWeight( const Registry ®istry )
{
ValidatorRegistryLogger()->trace( "{}: entry validators={}", __func__, registry.validators().size() );
uint64_t total_weight = 0;
for ( const auto &entry : registry.validators() )
{
if ( entry.status() != Status::ACTIVE )
{
continue;
}
total_weight += entry.weight();
}
ValidatorRegistryLogger()->debug( "{}: total_weight={}", __func__, total_weight );
return total_weight;
}
uint64_t ValidatorRegistry::QuorumThreshold( uint64_t total_weight ) const
{
ValidatorRegistryLogger()->trace( "{}: entry total_weight={}", __func__, total_weight );
if ( total_weight == 0 )
{
ValidatorRegistryLogger()->debug( "{}: total_weight is zero, threshold=0", __func__ );
return 0;
}
const uint64_t numerator = total_weight * quorum_numerator_;
const uint64_t threshold = ( numerator + quorum_denominator_ - 1 ) / quorum_denominator_;
ValidatorRegistryLogger()->debug( "{}: threshold={}", __func__, threshold );
return threshold;
}
bool ValidatorRegistry::IsQuorum( uint64_t accumulated_weight, uint64_t total_weight ) const
{
ValidatorRegistryLogger()->trace( "{}: entry accumulated={} total={}",
__func__,
accumulated_weight,
total_weight );
const bool is_quorum = accumulated_weight >= QuorumThreshold( total_weight );
ValidatorRegistryLogger()->debug( "{}: is_quorum={}", __func__, is_quorum );
return is_quorum;
}
ValidatorRegistry::Registry ValidatorRegistry::CreateGenesisRegistry(
const std::string &genesis_validator_id ) const
{
logger_->trace( "{}: entry genesis_id={}", __func__, genesis_validator_id.substr( 0, 8 ) );
Registry registry;
registry.set_epoch( 0 );
auto *entry = registry.add_validators();
entry->set_validator_id( genesis_validator_id );
entry->set_role( Role::GENESIS );
entry->set_status( Status::ACTIVE );
entry->set_weight( ComputeWeight( entry->role() ) );
entry->set_penalty_score( 0 );
entry->set_missed_epochs( 0 );
logger_->debug( "{}: registry created with weight={}", __func__, entry->weight() );
return registry;
}
outcome::result<std::vector<uint8_t>> ValidatorRegistry::SerializeRegistry( const Registry ®istry ) const
{
logger_->trace( "{}: entry validators={}", __func__, registry.validators().size() );
std::string serialized;
if ( !registry.SerializeToString( &serialized ) )
{
logger_->error( "{}: serialization failed", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
logger_->debug( "{}: serialized size={}", __func__, serialized.size() );
return std::vector<uint8_t>( serialized.begin(), serialized.end() );
}
outcome::result<ValidatorRegistry::Registry> ValidatorRegistry::DeserializeRegistry(
const std::vector<uint8_t> &buffer ) const
{
logger_->trace( "{}: entry size={}", __func__, buffer.size() );
Registry proto;
if ( !proto.ParseFromArray( buffer.data(), static_cast<int>( buffer.size() ) ) )
{
logger_->error( "{}: parse failed", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
logger_->debug( "{}: parsed validators={}", __func__, proto.validators().size() );
return proto;
}
outcome::result<std::vector<uint8_t>> ValidatorRegistry::SerializeRegistryUpdate(
const RegistryUpdate &update ) const
{
logger_->trace( "{}: entry validators={}", __func__, update.registry().validators().size() );
std::string serialized;
if ( !update.SerializeToString( &serialized ) )
{
logger_->error( "{}: serialization failed", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
logger_->debug( "{}: serialized size={}", __func__, serialized.size() );
return std::vector<uint8_t>( serialized.begin(), serialized.end() );
}
outcome::result<ValidatorRegistry::RegistryUpdate> ValidatorRegistry::DeserializeRegistryUpdate(
const std::vector<uint8_t> &buffer ) const
{
logger_->trace( "{}: entry size={}", __func__, buffer.size() );
RegistryUpdate proto;
if ( !proto.ParseFromArray( buffer.data(), static_cast<int>( buffer.size() ) ) )
{
logger_->error( "{}: parse failed", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
logger_->debug( "{}: parsed validators={}", __func__, proto.registry().validators().size() );
return proto;
}
outcome::result<void> ValidatorRegistry::StoreGenesisRegistry(
const std::string &genesis_validator_id,
std::function<std::vector<uint8_t>( std::vector<uint8_t> )> sign )
{
logger_->trace( "{}: entry genesis_id={}", __func__, genesis_validator_id.substr( 0, 8 ) );
{
std::shared_lock lock( cache_mutex_ );
if ( cache_initialized_ && cached_registry_ && !cached_registry_->validators().empty() )
{
logger_->info( "{}: registry already initialized, skipping", __func__ );
return outcome::success();
}
}
if ( !sign )
{
logger_->error( "{}: missing sign callback", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
logger_->debug( "{}: creating genesis registry", __func__ );
RegistryUpdate update;
*update.mutable_registry() = CreateGenesisRegistry( genesis_validator_id );
update.clear_prev_registry_hash();
auto signing_bytes = ComputeUpdateSigningBytes( update );
if ( signing_bytes.has_error() )
{
logger_->error( "{}: failed to compute signing bytes", __func__ );
return outcome::failure( signing_bytes.error() );
}
SignatureEntry signature_entry;
signature_entry.set_validator_id( genesis_validator_id );
auto signature = sign( signing_bytes.value() );
signature_entry.set_signature( signature.data(), signature.size() );
*update.add_signatures() = signature_entry;
auto serialized_update = SerializeRegistryUpdate( update );
if ( serialized_update.has_error() )
{
logger_->error( "{}: failed to serialize registry update", __func__ );
return outcome::failure( serialized_update.error() );
}
base::Buffer update_buffer(
gsl::span<const uint8_t>( serialized_update.value().data(), serialized_update.value().size() ) );
crdt::HierarchicalKey registry_key{ std::string( RegistryKey() ) };
auto registry_put = db_->Put( registry_key, update_buffer, { std::string( ValidatorTopic() ) } );
if ( registry_put.has_error() )
{
logger_->error( "{}: failed to store registry in CRDT", __func__ );
return outcome::failure( registry_put.error() );
}
auto cid_string = registry_put.value().toString();
if ( cid_string.has_value() )
{
logger_->info( "{}: stored genesis registry CID {}", __func__, cid_string.value() );
}
else
{
logger_->error( "{}: registry stored but CID missing", __func__ );
}
logger_->info( "{}: success", __func__ );
return outcome::success();
}
outcome::result<ValidatorRegistry::Registry> ValidatorRegistry::LoadRegistry() const
{
{
std::shared_lock<std::shared_mutex> lock( cache_mutex_ );
if ( cached_registry_ )
{
return cached_registry_.value();
}
}
auto update_result = LoadRegistryUpdate();
if ( update_result.has_error() )
{
logger_->error( "{}: failed to load registry update", __func__ );
return outcome::failure( update_result.error() );
}
logger_->debug( "{}: returning registry from update", __func__ );
return update_result.value().registry();
}
outcome::result<ValidatorRegistry::Registry> ValidatorRegistry::LoadRegistry( const std::string &cid ) const
{
ValidatorRegistryLogger()->trace( "{}: entry cid={}", __func__, cid );
BOOST_OUTCOME_TRY( auto cid_content, db_->GetCIDContent( cid ) );
ValidatorRegistryLogger()->trace( "{}: Got CID content with {} entries ", __func__, cid_content.size() );
crdt::HierarchicalKey registry_key{ std::string( RegistryKey() ) };
for ( auto &[key, registry_content] : cid_content )
{
ValidatorRegistryLogger()->trace( "{}: Processing CID content key={}", __func__, key );
if ( key != registry_key.GetKey() )
{
ValidatorRegistryLogger()->debug( "{}: Skipping non-registry content key={}, registry_key={}",
__func__,
key,
registry_key.GetKey() );
continue;
}
std::vector<uint8_t> bytes( registry_content.begin(), registry_content.end() );
auto decoded = DeserializeRegistryUpdate( bytes );
if ( decoded.has_error() )
{
ValidatorRegistryLogger()->error( "{}: failed to parse registry update ", __func__ );
continue;
}
ValidatorRegistryLogger()->debug( "{}: Grabbing registry from cid {} and key={}", __func__, cid, key );
return decoded.value().registry();
}
return outcome::failure( std::errc::no_such_file_or_directory );
}
outcome::result<ValidatorRegistry::RegistryUpdate> ValidatorRegistry::LoadRegistryUpdate() const
{
logger_->trace( "{}: entry", __func__ );
{
std::shared_lock<std::shared_mutex> lock( cache_mutex_ );
if ( cached_update_ )
{
logger_->debug( "{}: returning cached update", __func__ );
return cached_update_.value();
}
}
logger_->error( "{}: registry update not available", __func__ );
return outcome::failure( std::errc::no_such_file_or_directory );
}
outcome::result<ValidatorRegistry::RegistryUpdate> ValidatorRegistry::CreateUpdateFromCertificate(
const sgns::ConsensusCertificate &certificate )
{
logger_->trace( "{}: entry proposal_id={}", __func__, certificate.proposal_id() );
auto registry_result = LoadRegistry();
if ( registry_result.has_error() )
{
logger_->error( "{}: failed to load registry: {}", __func__, registry_result.error().message() );
return outcome::failure( registry_result.error() );
}
auto current_registry = registry_result.value();
if ( !ValidateCertificateForUpdate( certificate, current_registry ) )
{
logger_->error( "{}: invalid certificate", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
auto votes = ExtractCertificateVotes( certificate, current_registry );
RegistryUpdate update;
update.set_prev_registry_hash( GetRegistryCid() );
*update.mutable_registry() = BuildRegistryFromCertificate( current_registry,
certificate,
votes.registered_votes,
votes.unregistered_votes );
std::string serialized_cert;
if ( !certificate.SerializeToString( &serialized_cert ) )
{
logger_->error( "{}: failed to serialize certificate", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
update.set_certificate( serialized_cert );
logger_->debug( "{}: update created epoch={}", __func__, update.registry().epoch() );
return update;
}
outcome::result<void> ValidatorRegistry::StoreRegistryUpdate( const RegistryUpdate &update )
{
logger_->trace( "{}: entry epoch={}", __func__, update.registry().epoch() );
auto serialized_update = SerializeRegistryUpdate( update );
if ( serialized_update.has_error() )
{
logger_->error( "{}: failed to serialize registry update", __func__ );
return outcome::failure( serialized_update.error() );
}
base::Buffer update_buffer(
gsl::span<const uint8_t>( serialized_update.value().data(), serialized_update.value().size() ) );
crdt::HierarchicalKey registry_key{ std::string( RegistryKey() ) };
auto registry_put = db_->Put( registry_key, update_buffer, { std::string( ValidatorTopic() ) } );
if ( registry_put.has_error() )
{
logger_->error( "{}: failed to store registry update in CRDT", __func__ );
return outcome::failure( registry_put.error() );
}
auto cid_string = registry_put.value().toString();
if ( cid_string.has_value() )
{
logger_->info( "{}: stored registry update CID {}", __func__, cid_string.value() );
}
else
{
logger_->error( "{}: registry update stored but CID missing", __func__ );
}
logger_->info( "{}: success", __func__ );
return outcome::success();
}
outcome::result<std::shared_ptr<crdt::AtomicTransaction>> ValidatorRegistry::BeginRegistryUpdateTransaction(
const RegistryUpdate &update )
{
logger_->trace( "{}: entry epoch={}", __func__, update.registry().epoch() );
auto serialized_update = SerializeRegistryUpdate( update );
if ( serialized_update.has_error() )
{
logger_->error( "{}: failed to serialize registry update", __func__ );
return outcome::failure( serialized_update.error() );
}
base::Buffer update_buffer(
gsl::span<const uint8_t>( serialized_update.value().data(), serialized_update.value().size() ) );
auto tx = db_->BeginTransaction();
if ( !tx )
{
logger_->error( "{}: failed to begin atomic transaction", __func__ );
return outcome::failure( std::errc::not_enough_memory );
}
crdt::HierarchicalKey registry_key{ std::string( RegistryKey() ) };
auto registry_put = tx->Put( registry_key, update_buffer );
if ( registry_put.has_error() )
{
logger_->error( "{}: failed to stage registry update in transaction", __func__ );
return outcome::failure( registry_put.error() );
}
logger_->debug( "{}: staged registry update in transaction", __func__ );
return tx;
}
void ValidatorRegistry::SetMaxNewValidatorsPerUpdate( size_t max_new )
{
logger_->trace( "{}: entry max_new={}", __func__, max_new );
max_new_validators_per_update_ = max_new;
}
std::string ValidatorRegistry::GetRegistryCid() const
{
std::shared_lock<std::shared_mutex> lock( cache_mutex_ );
return cached_registry_id_;
}
uint64_t ValidatorRegistry::GetRegistryEpoch() const
{
std::shared_lock<std::shared_mutex> lock( cache_mutex_ );
if ( cached_registry_ )
{
return cached_registry_->epoch();
}
return 0;
}
void ValidatorRegistry::SetCertificatesPerBatch( size_t batch_size )
{
if ( batch_size == 0 )
{
logger_->warn( "{}: ignored zero batch size", __func__ );
return;
}
std::lock_guard<std::mutex> lock( batch_mutex_ );
certificates_per_batch_ = batch_size;
}
void ValidatorRegistry::SetBatchSubjectSubmitter(
std::function<outcome::result<void>( const ConsensusSubject &subject )> submitter )
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
submit_batch_subject_ = std::move( submitter );
}
outcome::result<std::string> ValidatorRegistry::ComputeBatchRoot(
const std::vector<std::string> &subject_hashes ) const
{
if ( subject_hashes.empty() )
{
return outcome::failure( std::errc::invalid_argument );
}
std::string payload;
payload += subject_hashes[0];
for ( size_t i = 1; i < subject_hashes.size(); ++i )
{
payload.push_back( '\n' );
payload += subject_hashes[i];
}
sgns::crypto::HasherImpl hasher;
auto hash = hasher.sha2_256( payload.data(), payload.size() );
return base::hex_lower( gsl::span<const uint8_t>( hash.data(), hash.size() ) );
}
outcome::result<std::vector<std::string>> ValidatorRegistry::SelectBatchSubjects(
const std::string &base_registry_cid,
uint64_t base_registry_epoch,
uint32_t certificate_count,
std::optional<std::string> expected_root ) const
{
if ( certificate_count == 0 )
{
return outcome::failure( std::errc::invalid_argument );
}
std::vector<std::string> selected;
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
const auto key = BuildBatchKey( base_registry_cid, base_registry_epoch );
auto it = pending_certificate_subjects_by_base_.find( key );
if ( it == pending_certificate_subjects_by_base_.end() ||
it->second.size() < static_cast<size_t>( certificate_count ) )
{
return outcome::failure( std::errc::resource_unavailable_try_again );
}
selected.assign( it->second.begin(), it->second.end() );
}
if ( selected.size() > static_cast<size_t>( certificate_count ) )
{
selected.resize( certificate_count );
}
auto root_result = ComputeBatchRoot( selected );
if ( root_result.has_error() )
{
return outcome::failure( root_result.error() );
}
if ( expected_root.has_value() && root_result.value() != expected_root.value() )
{
return outcome::failure( std::errc::invalid_argument );
}
return selected;
}
outcome::result<sgns::ConsensusCertificate> ValidatorRegistry::LoadCertificateBySubjectHash(
const std::string &subject_hash ) const
{
const auto cert_key = std::string( "/cert/" ) + subject_hash;
auto cert_get = db_->Get( crdt::HierarchicalKey( cert_key ) );
if ( cert_get.has_error() )
{
return outcome::failure( cert_get.error() );
}
sgns::ConsensusCertificate certificate;
std::string serialized = std::string( cert_get.value().toString() );
if ( !certificate.ParseFromString( serialized ) )
{
return outcome::failure( std::errc::invalid_argument );
}
return certificate;
}
void ValidatorRegistry::OnFinalizedCertificate( const sgns::ConsensusCertificate &certificate )
{
if ( !certificate.has_proposal() )
{
return;
}
if ( ConsensusManager::DecodeRegistryBatchSubject( certificate.proposal().subject() ).has_value() )
{
return;
}
auto subject_hash_result = ExtractConsensusSubjectHash( certificate.proposal().subject() );
if ( subject_hash_result.has_error() )
{
return;
}
const auto key = BuildBatchKey( certificate.registry_cid(), certificate.registry_epoch() );
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
pending_certificate_subjects_by_base_[key].insert( subject_hash_result.value() );
}
(void)TryCreateAndSubmitBatchProposal( certificate.registry_cid(), certificate.registry_epoch() );
}
outcome::result<void> ValidatorRegistry::TryCreateAndSubmitBatchProposal( const std::string &base_registry_cid,
uint64_t base_registry_epoch )
{
std::function<outcome::result<void>( const ConsensusSubject &subject )> submitter;
size_t threshold = 0;
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
submitter = submit_batch_subject_;
threshold = certificates_per_batch_;
}
if ( !submitter || threshold == 0 )
{
return outcome::success();
}
if ( GetRegistryCid() != base_registry_cid || GetRegistryEpoch() != base_registry_epoch )
{
return outcome::failure( std::errc::operation_canceled );
}
auto selected_result = SelectBatchSubjects( base_registry_cid,
base_registry_epoch,
static_cast<uint32_t>( threshold ),
std::nullopt );
if ( selected_result.has_error() )
{
return outcome::failure( selected_result.error() );
}
auto root_result = ComputeBatchRoot( selected_result.value() );
if ( root_result.has_error() )
{
return outcome::failure( root_result.error() );
}
auto subject_result = ConsensusManager::CreateRegistryBatchSubject( genesis_authority_,
base_registry_cid,
base_registry_epoch,
base_registry_epoch + 1,
static_cast<uint32_t>( threshold ),
root_result.value() );
if ( subject_result.has_error() )
{
return outcome::failure( subject_result.error() );
}
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
auto batch_hash_result = ExtractConsensusSubjectHash( subject_result.value() );
if ( batch_hash_result.has_error() )
{
return outcome::failure( batch_hash_result.error() );
}
if ( pending_batch_subject_ids_.find( batch_hash_result.value() ) != pending_batch_subject_ids_.end() )
{
return outcome::success();
}
pending_batch_subject_ids_.insert( batch_hash_result.value() );
}
return submitter( subject_result.value() );
}
outcome::result<ValidatorRegistry::BatchSubjectDecision> ValidatorRegistry::EvaluateBatchSubject(
const ConsensusSubject &subject )
{
auto payload_result = ConsensusManager::DecodeRegistryBatchSubject( subject );
if ( payload_result.has_error() )
{
return outcome::success( BatchSubjectDecision::Reject );
}
const auto &payload = payload_result.value();
auto selected_result = SelectBatchSubjects( payload.base_registry_cid(),
payload.base_registry_epoch(),
payload.certificate_count(),
std::string( payload.batch_root() ) );
if ( selected_result.has_error() )
{
if ( selected_result.error() == std::errc::resource_unavailable_try_again )
{
return outcome::success( BatchSubjectDecision::Pending );
}
return outcome::success( BatchSubjectDecision::Reject );
}
auto registry_result = LoadRegistry();
if ( registry_result.has_error() )
{
return outcome::success( BatchSubjectDecision::Pending );
}
if ( registry_result.value().epoch() != payload.base_registry_epoch() ||
GetRegistryCid() != payload.base_registry_cid() )
{
return outcome::success( BatchSubjectDecision::Reject );
}
return outcome::success( BatchSubjectDecision::Approve );
}
outcome::result<ValidatorRegistry::BatchCertificateDecision> ValidatorRegistry::HandleBatchCertificate(
const std::string &subject_hash,
const sgns::ConsensusCertificate &certificate )
{
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
if ( finalized_batch_subject_ids_.find( subject_hash ) != finalized_batch_subject_ids_.end() ||
applying_batch_subject_ids_.find( subject_hash ) != applying_batch_subject_ids_.end() )
{
return BatchCertificateDecision::Approve;
}
}
auto current_registry_result = LoadRegistry();
if ( current_registry_result.has_error() ||
!ValidateCertificate( certificate, current_registry_result.value() ) )
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
applying_batch_subject_ids_.erase( subject_hash );
return BatchCertificateDecision::Reject;
}
auto payload_result = ConsensusManager::DecodeRegistryBatchSubject( certificate.proposal().subject() );
if ( payload_result.has_error() )
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
applying_batch_subject_ids_.erase( subject_hash );
return BatchCertificateDecision::Reject;
}
const auto &payload = payload_result.value();
auto selected_result = SelectBatchSubjects( payload.base_registry_cid(),
payload.base_registry_epoch(),
payload.certificate_count(),
std::string( payload.batch_root() ) );
if ( selected_result.has_error() )
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
applying_batch_subject_ids_.erase( subject_hash );
return BatchCertificateDecision::Reject;
}
auto base_registry_result = LoadRegistry( payload.base_registry_cid() );
if ( base_registry_result.has_error() )
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
applying_batch_subject_ids_.erase( subject_hash );
return BatchCertificateDecision::Stalled;
}
std::vector<sgns::ConsensusCertificate> certificates;
certificates.reserve( selected_result.value().size() );
for ( const auto &tx_subject_hash : selected_result.value() )
{
auto cert_result = LoadCertificateBySubjectHash( tx_subject_hash );
if ( cert_result.has_error() )
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
applying_batch_subject_ids_.erase( subject_hash );
return BatchCertificateDecision::Reject;
}
certificates.push_back( cert_result.value() );
}
std::unordered_map<std::string, int64_t> registered_scores;
std::unordered_map<std::string, int64_t> unregistered_scores;
for ( const auto &tx_cert : certificates )
{
auto votes = ExtractCertificateVotes( tx_cert, base_registry_result.value() );
for ( const auto &[validator_id, approve] : votes.registered_votes )
{
registered_scores[validator_id] += approve ? 1 : -1;
}
for ( const auto &[validator_id, approve] : votes.unregistered_votes )
{
unregistered_scores[validator_id] += approve ? 1 : -1;
}
}
std::unordered_map<std::string, bool> registered_votes;
std::unordered_map<std::string, bool> unregistered_votes;
for ( const auto &[validator_id, score] : registered_scores )
{
registered_votes[validator_id] = score >= 0;
}
for ( const auto &[validator_id, score] : unregistered_scores )
{
unregistered_votes[validator_id] = score >= 0;
}
RegistryUpdate update;
update.set_prev_registry_hash( payload.base_registry_cid() );
*update.mutable_registry() = BuildRegistryFromAggregatedVotes( base_registry_result.value(),
registered_votes,
unregistered_votes );
std::string serialized_cert;
if ( !certificate.SerializeToString( &serialized_cert ) )
{
std::lock_guard<std::mutex> lock( batch_mutex_ );
applying_batch_subject_ids_.erase( subject_hash );
return outcome::failure( std::errc::invalid_argument );
}
update.set_certificate( serialized_cert );
for ( const auto &tx_subject_hash : selected_result.value() )
{
update.add_batch_certificate_subject_hashes( tx_subject_hash );
}
std::thread(
[weak_self = weak_from_this(), subject_hash, update = std::move( update )]() mutable
{
auto self = weak_self.lock();
if ( !self )
{
return;
}
auto store_result = self->StoreRegistryUpdate( update );
std::lock_guard<std::mutex> lock( self->batch_mutex_ );
self->applying_batch_subject_ids_.erase( subject_hash );
if ( store_result.has_error() )
{
self->logger_->error( "{}: failed storing batch registry update subject_hash={} error={}",
__func__,
subject_hash.substr( 0, 8 ),
store_result.error().message() );
return;
}
self->pending_batch_subject_ids_.erase( subject_hash );
self->finalized_batch_subject_ids_.insert( subject_hash );
} )
.detach();
return BatchCertificateDecision::Approve;
}
outcome::result<std::optional<uint64_t>> ValidatorRegistry::GetValidatorWeight(
const std::string &validator_id ) const
{
std::shared_lock<std::shared_mutex> lock( cache_mutex_ );
if ( !cache_initialized_ || !cached_registry_ )
{
return outcome::success( std::optional<uint64_t>{} );
}
const auto *validator = FindValidator( cached_registry_.value(), validator_id );
if ( !validator || validator->status() != Status::ACTIVE )
{
return outcome::success( std::optional<uint64_t>{} );
}
return outcome::success( std::optional<uint64_t>{ validator->weight() } );
}
bool ValidatorRegistry::RegisterFilter()
{
logger_->trace( "{}: entry", __func__ );
const std::string pattern = "/?" + std::string( RegistryKey() );
auto weak_self = weak_from_this();
const bool filter_registered = db_->RegisterElementFilter(
pattern,
[weak_self]( const crdt::pb::Element &element ) -> std::optional<std::vector<crdt::pb::Element>>
{
if ( auto strong = weak_self.lock() )
{
return strong->FilterRegistryUpdate( element );
}
return std::nullopt;
} );
const bool callback_registered = db_->RegisterNewElementCallback(
pattern,
[weak_self]( crdt::CRDTCallbackManager::NewDataPair new_data, const std::string &cid )
{
if ( auto strong = weak_self.lock() )
{
strong->RegistryUpdateReceived( std::move( new_data ), cid );
}
} );
db_->AddListenTopic( std::string( ValidatorTopic() ) );
const bool result = filter_registered && callback_registered;
logger_->info( "{}: result={}", __func__, result );
return result;
}
std::optional<std::vector<crdt::pb::Element>> ValidatorRegistry::FilterRegistryUpdate(
const crdt::pb::Element &element )
{
logger_->trace( "{}: entry key={}", __func__, element.key() );
std::vector<uint8_t> bytes( element.value().begin(), element.value().end() );
auto decoded_update = DeserializeRegistryUpdate( bytes );
if ( decoded_update.has_error() )
{
logger_->error( "{}: parse failed, rejecting: {}", __func__, element.key() );
return std::vector<crdt::pb::Element>{};
}
RegistryUpdate update = decoded_update.value();
const Registry *current_ptr = nullptr;
{
std::shared_lock<std::shared_mutex> lock( cache_mutex_ );
if ( cached_registry_ )
{
current_ptr = &cached_registry_.value();
}
}
if ( !VerifyUpdate( update, current_ptr, false ) )
{
logger_->error( "{}: verification failed, rejecting: {}", __func__, element.key() );
return std::vector<crdt::pb::Element>{};
}
logger_->debug( "{}: update accepted", __func__ );
return std::nullopt;
}
void ValidatorRegistry::RegistryUpdateReceived( const crdt::CRDTCallbackManager::NewDataPair &new_data,
const std::string &cid )
{
logger_->trace( "{}: entry cid={}", __func__, cid );
const auto &buffer = new_data.second;
auto decoded = DeserializeRegistryUpdate( buffer.toVector() );
if ( decoded.has_error() )
{
logger_->error( "{}: failed to parse registry update for cache refresh", __func__ );
return;
}
{
std::unique_lock<std::shared_mutex> lock( cache_mutex_ );
cached_update_ = decoded.value();
cached_registry_ = decoded.value().registry();
cached_registry_id_ = cid;
cache_initialized_ = true;
}
PersistLocalState( cid );
NotifyInitialized( true );
logger_->info( "{}: cache updated and initialized", __func__ );
}
outcome::result<std::vector<uint8_t>> ValidatorRegistry::ComputeUpdateSigningBytes(
const RegistryUpdate &update ) const
{
logger_->trace( "{}: entry validators={}", __func__, update.registry().validators().size() );
validator::RegistrySigningPayload payload;
*payload.mutable_registry() = update.registry();
payload.set_prev_registry_hash( update.prev_registry_hash() );
std::string serialized;
if ( !payload.SerializeToString( &serialized ) )
{
logger_->error( "{}: serialization failed", __func__ );
return outcome::failure( std::errc::invalid_argument );
}
logger_->debug( "{}: payload size={}", __func__, serialized.size() );
return std::vector<uint8_t>( serialized.begin(), serialized.end() );
}
bool ValidatorRegistry::VerifyUpdate( const RegistryUpdate &update,
const Registry *current_registry,
bool enforce_time_window ) const
{
logger_->trace( "{}: entry validators={}", __func__, update.registry().validators().size() );
if ( update.registry().validators().empty() )
{
logger_->error( "{}: empty registry update", __func__ );
return false;
}
auto signing_bytes = ComputeUpdateSigningBytes( update );
if ( signing_bytes.has_error() )
{
logger_->error( "{}: signing bytes computation failed", __func__ );
return false;
}
if ( !current_registry )
{
logger_->debug( "{}: verifying genesis update", __func__ );
if ( update.prev_registry_hash().empty() )
{
for ( const auto &signature : update.signatures() )
{
if ( signature.validator_id() != genesis_authority_ )
{
continue;
}
if ( GeniusAccount::VerifySignature( signature.validator_id(),
signature.signature(),
signing_bytes.value() ) )
{
logger_->info( "{}: genesis update verified", __func__ );
return true;
}
}
}
logger_->error( "{}: genesis update verification failed", __func__ );
return false;
}
if ( !update.certificate().empty() )
{
sgns::ConsensusCertificate certificate;
if ( !certificate.ParseFromString( update.certificate() ) )
{
logger_->error( "{}: invalid certificate payload", __func__ );
return false;
}
if ( enforce_time_window )
{
if ( !ValidateCertificateForUpdate( certificate, *current_registry ) )
{
logger_->error( "{}: certificate verification failed", __func__ );
return false;
}
}
else
{
if ( !ValidateCertificate( certificate, *current_registry ) )
{
logger_->error( "{}: certificate verification failed", __func__ );
return false;
}
}
Registry expected;
auto batch_payload = certificate.has_proposal() && certificate.proposal().has_subject()
? ConsensusManager::DecodeRegistryBatchSubject( certificate.proposal().subject() )
: outcome::failure( std::errc::invalid_argument );
if ( batch_payload.has_value() )
{
const auto &payload = batch_payload.value();
if ( payload.base_registry_cid() != update.prev_registry_hash() ||
payload.base_registry_epoch() != current_registry->epoch() ||
payload.target_registry_epoch() != current_registry->epoch() + 1 )
{
logger_->error( "{}: batch subject metadata mismatch", __func__ );
return false;
}
if ( update.batch_certificate_subject_hashes_size() != static_cast<int>( payload.certificate_count() ) )
{
logger_->error( "{}: batch subject certificate count mismatch", __func__ );
return false;
}
std::vector<std::string> subject_hashes;
subject_hashes.reserve( static_cast<size_t>( update.batch_certificate_subject_hashes_size() ) );
for ( const auto &subject_hash : update.batch_certificate_subject_hashes() )
{
subject_hashes.push_back( subject_hash );
}
std::sort( subject_hashes.begin(), subject_hashes.end() );
auto root_result = ComputeBatchRoot( subject_hashes );
if ( root_result.has_error() )
{
return false;
}
const auto payload_root = std::string( payload.batch_root() );
if ( payload_root != root_result.value() )
{
logger_->error( "{}: batch root mismatch", __func__ );
return false;
}
std::unordered_map<std::string, int64_t> registered_scores;
std::unordered_map<std::string, int64_t> unregistered_scores;
for ( const auto &subject_hash : subject_hashes )
{
auto certificate_result = LoadCertificateBySubjectHash( subject_hash );
if ( certificate_result.has_error() )
{
logger_->error( "{}: missing certificate for batch hash={}",
__func__,
subject_hash.substr( 0, 8 ) );
return false;
}
const auto &tx_cert = certificate_result.value();
if ( tx_cert.registry_cid() != payload.base_registry_cid() ||
tx_cert.registry_epoch() != payload.base_registry_epoch() )
{
logger_->error( "{}: batch certificate registry mismatch", __func__ );
return false;
}
auto votes = ExtractCertificateVotes( tx_cert, *current_registry );
for ( const auto &[validator_id, approve] : votes.registered_votes )
{
registered_scores[validator_id] += approve ? 1 : -1;
}
for ( const auto &[validator_id, approve] : votes.unregistered_votes )
{
unregistered_scores[validator_id] += approve ? 1 : -1;
}
}
std::unordered_map<std::string, bool> registered_votes;
std::unordered_map<std::string, bool> unregistered_votes;
for ( const auto &[validator_id, score] : registered_scores )
{
registered_votes[validator_id] = score >= 0;
}
for ( const auto &[validator_id, score] : unregistered_scores )
{
unregistered_votes[validator_id] = score >= 0;
}
expected = BuildRegistryFromAggregatedVotes( *current_registry, registered_votes, unregistered_votes );
}
else
{
auto votes = ExtractCertificateVotes( certificate, *current_registry );
expected = BuildRegistryFromCertificate( *current_registry,
certificate,
votes.registered_votes,
votes.unregistered_votes );
}
Registry provided = update.registry();
NormalizeRegistry( provided );
NormalizeRegistry( expected );
if ( provided.epoch() != current_registry->epoch() + 1 )
{
logger_->error( "{}: epoch not next expected", __func__ );
return false;
}
if ( provided.SerializeAsString() != expected.SerializeAsString() )
{
logger_->error( "{}: registry mismatch against certificate", __func__ );
return false;
}
const std::string prev_registry_cid = update.prev_registry_hash();
std::string current_id;
{
std::shared_lock<std::shared_mutex> lock( cache_mutex_ );
current_id = cached_registry_id_;
}
if ( current_id.empty() || prev_registry_cid != current_id )
{
logger_->error( "{}: prev registry CID mismatch", __func__ );
return false;
}
logger_->info( "{}: certificate-based update verified", __func__ );
return true;
}
const std::string prev_registry_cid = update.prev_registry_hash();
std::string current_id;
{
std::shared_lock lock( cache_mutex_ );
current_id = cached_registry_id_;
}
if ( current_id.empty() || prev_registry_cid != current_id )
{
//TODO - Check if the CID checking is necessary, because we could receive out-of-order updates
logger_->error( "{}: prev registry CID mismatch", __func__ );
return false;
}
if ( update.registry().epoch() != current_registry->epoch() + 1 )
{
logger_->error( "{}: epoch not next expected", __func__ );
return false;
}
uint64_t total_weight = TotalWeight( *current_registry );
uint64_t accumulated_weight = 0;
std::set<std::string> seen;
for ( const auto &signature : update.signatures() )
{
if ( !seen.insert( signature.validator_id() ).second )
{
continue;
}
const auto *validator = FindValidator( *current_registry, signature.validator_id() );
if ( !validator || validator->status() != Status::ACTIVE )
{
continue;
}
if ( !GeniusAccount::VerifySignature( signature.validator_id(),
signature.signature(),
signing_bytes.value() ) )
{
continue;
}
accumulated_weight += validator->weight();
if ( IsQuorum( accumulated_weight, total_weight ) )
{
logger_->info( "{}: quorum reached", __func__ );
return true;
}
}
logger_->error( "{}: quorum not reached", __func__ );
return false;
}
bool ValidatorRegistry::ValidateCertificate( const sgns::ConsensusCertificate &certificate,
const Registry ¤t_registry ) const
{
logger_->trace( "{}: entry proposal_id={}", __func__, certificate.proposal_id() );
if ( !certificate.has_proposal() )
{
logger_->error( "{}: missing proposal in certificate", __func__ );
return false;
}
const auto &proposal = certificate.proposal();
if ( !ValidateProposal( proposal ) )
{
logger_->error( "{}: invalid proposal signature", __func__ );
return false;
}
if ( proposal.proposal_id() != certificate.proposal_id() )
{
logger_->error( "{}: proposal_id mismatch cert={} proposal={}",
__func__,
certificate.proposal_id(),
proposal.proposal_id() );
return false;
}
if ( proposal.registry_epoch() != certificate.registry_epoch() ||
proposal.registry_cid() != certificate.registry_cid() )
{
logger_->error( "{}: registry metadata mismatch proposal_id={}", __func__, proposal.proposal_id() );
return false;
}
if ( proposal.registry_epoch() != current_registry.epoch() )
{
logger_->error( "{}: registry epoch mismatch cert={} registry={}",
__func__,
proposal.registry_epoch(),
current_registry.epoch() );
return false;
}
const std::string current_id = GetRegistryCid();
if ( !current_id.empty() && !proposal.registry_cid().empty() && proposal.registry_cid() != current_id )
{
logger_->error( "{}: registry CID mismatch cert={} registry={}",
__func__,
proposal.registry_cid(),
current_id );
return false;
}
if ( certificate.proposal_id().empty() )
{
logger_->error( "{}: empty proposal_id", __func__ );
return false;
}
return true;
}
bool ValidatorRegistry::ValidateCertificateForUpdate( const sgns::ConsensusCertificate &certificate,
const Registry ¤t_registry ) const
{
const uint64_t window_ms = weight_config_.certificate_timestamp_window_ms_;
if ( window_ms > 0 )
{
const auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch() )
.count();
const auto cert_ms = static_cast<int64_t>( certificate.timestamp() );
const auto diff = std::llabs( now_ms - cert_ms );
if ( cert_ms == 0 || static_cast<uint64_t>( diff ) > window_ms )
{
logger_->error( "{}: certificate timestamp outside window", __func__ );
return false;
}
}
return ValidateCertificate( certificate, current_registry );
}
ValidatorRegistry::CertificateVotes ValidatorRegistry::ExtractCertificateVotes(
const sgns::ConsensusCertificate &certificate,
const Registry ¤t_registry ) const
{
CertificateVotes result;
uint64_t total_weight = TotalWeight( current_registry );
uint64_t approved_weight = 0;
std::unordered_set<std::string> seen;
for ( const auto &vote : certificate.votes() )
{
if ( vote.proposal_id() != certificate.proposal_id() )
{
continue;
}
if ( !seen.insert( vote.voter_id() ).second )
{
continue;
}
auto signing_bytes = VoteSigningBytes( vote );
if ( signing_bytes.has_error() )
{
continue;
}
if ( !GeniusAccount::VerifySignature( vote.voter_id(), vote.signature(), signing_bytes.value() ) )
{
continue;
}
const auto *validator = FindValidator( current_registry, vote.voter_id() );
if ( !validator )
{
result.unregistered.insert( vote.voter_id() );
result.unregistered_votes[vote.voter_id()] = vote.approve();
continue;
}
result.registered_votes[vote.voter_id()] = vote.approve();
if ( vote.approve() && validator->status() == Status::ACTIVE )
{
approved_weight += validator->weight();
result.approved.insert( vote.voter_id() );
}
}
if ( !IsQuorum( approved_weight, total_weight ) )
{
logger_->error( "{}: quorum not reached approved={} total={}", __func__, approved_weight, total_weight );
return {};
}
logger_->debug( "{}: quorum verified approved={} total={}", __func__, approved_weight, total_weight );
return result;
}
ValidatorRegistry::Registry ValidatorRegistry::BuildRegistryFromCertificate(
const Registry ¤t_registry,
const sgns::ConsensusCertificate &certificate,
const std::unordered_map<std::string, bool> ®istered_votes,
const std::unordered_map<std::string, bool> &unregistered_votes ) const
{
logger_->debug(
"{}: building registry update proposal_id={} epoch={} current_validators={} registered_votes={} unregistered_votes={}",
__func__,
certificate.proposal_id().substr( 0, 8 ),
current_registry.epoch(),
current_registry.validators_size(),
registered_votes.size(),
unregistered_votes.size() );
if ( !unregistered_votes.empty() )
{
std::vector<std::string> unregistered_ids;
unregistered_ids.reserve( unregistered_votes.size() );
for ( const auto &pair : unregistered_votes )
{
unregistered_ids.push_back( pair.first.substr( 0, 8 ) );
}
std::sort( unregistered_ids.begin(), unregistered_ids.end() );
logger_->debug( "{}: unregistered voter ids (prefixes)={}", __func__, fmt::join( unregistered_ids, "," ) );
}
Registry next = current_registry;
next.set_epoch( current_registry.epoch() + 1 );
const int before_count = next.validators_size();
InsertNewValidators( next, unregistered_votes );
const int after_insert = next.validators_size();
if ( after_insert > before_count )
{
std::vector<std::string> new_ids;
new_ids.reserve( static_cast<size_t>( after_insert - before_count ) );
for ( const auto &entry : next.validators() )
{
if ( !FindValidator( current_registry, entry.validator_id() ) )
{
new_ids.push_back( entry.validator_id().substr( 0, 8 ) );
}
}
std::sort( new_ids.begin(), new_ids.end() );
logger_->debug( "{}: inserted {} new validators (prefixes)={}",
__func__,
new_ids.size(),
fmt::join( new_ids, "," ) );
}
std::vector<ValidatorEntry> entries;
entries.reserve( static_cast<size_t>( next.validators_size() ) );
for ( const auto &entry : next.validators() )
{
entries.push_back( entry );
}
ApplyVoteEffects( entries, registered_votes );
std::unordered_set<std::string> participants;
participants.reserve( registered_votes.size() + unregistered_votes.size() );
for ( const auto &pair : registered_votes )
{
participants.insert( pair.first );
}
for ( const auto &pair : unregistered_votes )
{
participants.insert( pair.first );
}
ApplyInactivityDecay( entries, participants );
ApplyTotalWeightCap( entries );
std::sort( entries.begin(),
entries.end(),
[]( const ValidatorEntry &a, const ValidatorEntry &b )
{ return a.validator_id() < b.validator_id(); } );
next.clear_validators();
for ( const auto &entry : entries )
{
*next.add_validators() = entry;
}
logger_->debug( "{}: built registry from certificate proposal_id={} epoch={} validators={}",
__func__,
certificate.proposal_id().substr( 0, 8 ),
next.epoch(),
next.validators_size() );
return next;
}
ValidatorRegistry::Registry ValidatorRegistry::BuildRegistryFromAggregatedVotes(
const Registry ¤t_registry,
const std::unordered_map<std::string, bool> ®istered_votes,
const std::unordered_map<std::string, bool> &unregistered_votes ) const
{
Registry next = current_registry;
next.set_epoch( current_registry.epoch() + 1 );
InsertNewValidators( next, unregistered_votes );
std::vector<ValidatorEntry> entries;
entries.reserve( static_cast<size_t>( next.validators_size() ) );
for ( const auto &entry : next.validators() )
{
entries.push_back( entry );
}
ApplyVoteEffects( entries, registered_votes );
std::unordered_set<std::string> participants;
participants.reserve( registered_votes.size() + unregistered_votes.size() );
for ( const auto &pair : registered_votes )
{
participants.insert( pair.first );
}
for ( const auto &pair : unregistered_votes )
{
participants.insert( pair.first );
}
ApplyInactivityDecay( entries, participants );
ApplyTotalWeightCap( entries );
std::sort( entries.begin(),
entries.end(),
[]( const ValidatorEntry &a, const ValidatorEntry &b )
{ return a.validator_id() < b.validator_id(); } );
next.clear_validators();
for ( const auto &entry : entries )
{
*next.add_validators() = entry;
}
return next;
}
void ValidatorRegistry::InsertNewValidators( Registry ®istry,
const std::unordered_map<std::string, bool> &unregistered_votes ) const
{
std::vector<std::string> new_ids;
new_ids.reserve( unregistered_votes.size() );
for ( const auto &pair : unregistered_votes )
{
new_ids.push_back( pair.first );
}
std::sort( new_ids.begin(), new_ids.end() );
size_t added = 0;
for ( const auto &validator_id : new_ids )
{
if ( added >= max_new_validators_per_update_ )
{
logger_->debug( "{}: new validator cap reached {}", __func__, max_new_validators_per_update_ );
break;
}
if ( FindValidator( registry, validator_id ) )
{
continue;
}
auto *entry = registry.add_validators();
entry->set_validator_id( validator_id );
entry->set_role( Role::REGULAR );
entry->set_status( Status::ACTIVE );
entry->set_weight( ComputeWeight( entry->role() ) );
auto it = unregistered_votes.find( validator_id );
const bool approve = ( it != unregistered_votes.end() ) ? it->second : true;
entry->set_penalty_score( approve ? 0 : 1 );
entry->set_missed_epochs( 0 );
logger_->debug( "{}: added validator id={} weight={} approve={} penalty={} status={}",
__func__,
validator_id.substr( 0, 8 ),
entry->weight(),
approve,
entry->penalty_score(),
static_cast<int>( entry->status() ) );
++added;
}
}
void ValidatorRegistry::ApplyVoteEffects( std::vector<ValidatorEntry> &entries,
const std::unordered_map<std::string, bool> ®istered_votes ) const
{
for ( auto &entry : entries )
{
auto vote_it = registered_votes.find( entry.validator_id() );
if ( vote_it == registered_votes.end() )
{
continue;
}
const bool approve = vote_it->second;
uint32_t penalty = static_cast<uint32_t>( entry.penalty_score() );
const uint32_t cap = weight_config_.penalty_cap_;
const uint64_t old_weight = entry.weight();
const uint32_t old_penalty = penalty;
const auto old_status = entry.status();
entry.set_missed_epochs( 0 );
if ( approve )
{
if ( penalty > 0 )
{
penalty -= 1;
}
entry.set_penalty_score( penalty );
if ( entry.status() == Status::ACTIVE )
{
const uint64_t increment = weight_config_.approval_increment_;
if ( increment > 0 )
{
uint64_t role_cap = weight_config_.regular_max_weight_;
switch ( entry.role() )
{
case Role::GENESIS:
role_cap = weight_config_.genesis_max_weight_;
break;
case Role::FULL:
role_cap = weight_config_.full_max_weight_;
break;
case Role::SHARDED:
role_cap = weight_config_.sharded_max_weight_;
break;
case Role::REGULAR:
default:
role_cap = weight_config_.regular_max_weight_;
break;
}
const uint64_t clamped = std::min( entry.weight() + increment, role_cap );
entry.set_weight( clamped );
}
}
else if ( penalty == 0 )
{
entry.set_status( Status::ACTIVE );
}
}
else
{
if ( entry.status() == Status::BLACKLISTED )
{
const uint32_t bumped = std::min(
cap,
static_cast<uint32_t>( penalty + weight_config_.blacklist_bump_ ) );
penalty = bumped;
}
else
{
if ( penalty < cap )
{
penalty += 1;
}
if ( penalty >= weight_config_.penalty_threshold_ )
{
entry.set_status( Status::BLACKLISTED );
const uint32_t bumped = std::min(
cap,
static_cast<uint32_t>( penalty + weight_config_.blacklist_bump_ ) );
penalty = bumped;
}
}
entry.set_penalty_score( penalty );
}
logger_->debug( "{}: vote effect id={} approve={} weight {}->{} penalty {}->{} status {}->{}",
__func__,
entry.validator_id().substr( 0, 8 ),
approve,
old_weight,
entry.weight(),
old_penalty,
entry.penalty_score(),
static_cast<int>( old_status ),
static_cast<int>( entry.status() ) );
}
}
void ValidatorRegistry::ApplyInactivityDecay( std::vector<ValidatorEntry> &entries,
const std::unordered_set<std::string> &participants ) const
{
for ( auto &entry : entries )
{
if ( entry.status() != Status::ACTIVE )
{
continue;
}
if ( participants.find( entry.validator_id() ) != participants.end() )
{
continue;
}
uint32_t missed = static_cast<uint32_t>( entry.missed_epochs() );
if ( missed < std::numeric_limits<uint32_t>::max() )
{
missed += 1;
}
entry.set_missed_epochs( missed );
if ( missed >= weight_config_.missed_epoch_threshold_ )
{
const uint32_t dec = weight_config_.inactivity_decrement_;
if ( dec > 0 && entry.weight() > 0 )
{
const uint64_t old_weight = entry.weight();
const uint64_t new_weight = ( entry.weight() > dec ) ? ( entry.weight() - dec ) : 0;
entry.set_weight( new_weight );
if ( new_weight == 0 )
{
entry.set_status( Status::SUSPENDED );
}
logger_->debug( "{}: inactivity decay id={} missed={} weight {}->{} status={}",
__func__,
entry.validator_id().substr( 0, 8 ),
missed,
old_weight,
new_weight,
static_cast<int>( entry.status() ) );
}
}
}
}
void ValidatorRegistry::ApplyTotalWeightCap( std::vector<ValidatorEntry> &entries ) const
{
uint64_t total_active = 0;
for ( const auto &entry : entries )
{
if ( entry.status() == Status::ACTIVE )
{
total_active += entry.weight();
}
}
const uint64_t weight_cap = weight_config_.genesis_weight_ * weight_config_.total_weight_cap_multiplier_;
if ( weight_cap == 0 || total_active <= weight_cap )
{
return;
}
logger_->debug( "{}: applying total weight cap total_active={} cap={}", __func__, total_active, weight_cap );
uint64_t scaled_sum = 0;
std::vector<size_t> active_indices;
active_indices.reserve( entries.size() );
for ( size_t i = 0; i < entries.size(); ++i )
{
if ( entries[i].status() != Status::ACTIVE )
{
continue;
}
const uint64_t old_weight = entries[i].weight();
const uint64_t scaled = ( entries[i].weight() * weight_cap ) / total_active;
entries[i].set_weight( scaled );
scaled_sum += scaled;
active_indices.push_back( i );
logger_->debug( "{}: cap scale id={} weight {}->{}",
__func__,
entries[i].validator_id().substr( 0, 8 ),
old_weight,
scaled );
}
uint64_t remainder = ( scaled_sum <= weight_cap ) ? ( weight_cap - scaled_sum ) : 0;
if ( remainder == 0 || active_indices.empty() )
{
return;
}
std::sort( active_indices.begin(),
active_indices.end(),
[&entries]( size_t a, size_t b )
{
if ( entries[a].weight() != entries[b].weight() )
{
return entries[a].weight() > entries[b].weight();
}
return entries[a].validator_id() < entries[b].validator_id();
} );
size_t idx = 0;
while ( remainder > 0 )
{
entries[active_indices[idx]].set_weight( entries[active_indices[idx]].weight() + 1 );
remainder -= 1;
idx = ( idx + 1 ) % active_indices.size();
}
for ( const auto active_idx : active_indices )
{
logger_->debug( "{}: cap final id={} weight={}",
__func__,
entries[active_idx].validator_id().substr( 0, 8 ),
entries[active_idx].weight() );
}
}
void ValidatorRegistry::NormalizeRegistry( Registry ®istry )
{
std::vector<ValidatorEntry> entries;
entries.reserve( static_cast<size_t>( registry.validators_size() ) );
for ( const auto &entry : registry.validators() )
{
entries.push_back( entry );
}
std::sort( entries.begin(),
entries.end(),
[]( const ValidatorEntry &a, const ValidatorEntry &b )
{ return a.validator_id() < b.validator_id(); } );
registry.clear_validators();
for ( const auto &entry : entries )
{
*registry.add_validators() = entry;
}
}
const ValidatorRegistry::ValidatorEntry *ValidatorRegistry::FindValidator( const Registry ®istry,
const std::string &validator_id )
{
ValidatorRegistryLogger()->trace( "{}: entry id={}", __func__, validator_id.substr( 0, 8 ) );
for ( const auto &validator : registry.validators() )
{
if ( validator.validator_id() == validator_id )
{
ValidatorRegistryLogger()->debug( "{}: validator found", __func__ );
return &validator;
}
}
ValidatorRegistryLogger()->debug( "{}: validator not found", __func__ );
return nullptr;
}
void ValidatorRegistry::InitializeCache()
{
logger_->trace( "{}: entry", __func__ );
std::unique_lock<std::shared_mutex> lock( cache_mutex_ );
if ( cache_initialized_ )
{
logger_->error( "{}: cache already initialized", __func__ );
return;
}
logger_->trace( "{}: grabbing validator registry from CRDT", __func__ );
crdt::HierarchicalKey registry_key{ std::string( RegistryKey() ) };
auto registry_get = db_->Get( registry_key );
bool content_present = registry_get.has_value();
if ( !content_present )
{
logger_->error( "{}: registry content not found during cache init", __func__ );
return;
}
const auto &buffer = registry_get.value();
auto decoded = DeserializeRegistryUpdate( buffer.toVector() );
if ( !decoded.has_value() )
{
logger_->error( "{}: failed to parse registry content during cache init", __func__ );
return;
}
cached_update_ = decoded.value();
cached_registry_ = decoded.value().registry();
logger_->debug( "{}: cache populated validators={}", __func__, cached_registry_->validators().size() );
cache_initialized_ = true;
sgns::crdt::GlobalDB::Buffer registry_cid_key;
registry_cid_key.put( std::string( RegistryCidKey() ) );
auto registry_cid = db_->GetDataStore()->get( registry_cid_key );
if ( registry_cid.has_value() )
{
cached_registry_id_ = registry_cid.value().toString();
logger_->info( "{}: cache initialized with CID {}", __func__, cached_registry_id_ );
NotifyInitialized( true );
return;
}
std::set<CID> heads_to_request;
logger_->error( "{}: registry content found but CID missing, requesting heads", __func__ );
auto heads_result = db_->GetCRDTHeadList();
if ( heads_result.has_value() )
{
const auto &heads_map = heads_result.value().first;
auto it = heads_map.find( std::string( ValidatorTopic() ) );
if ( it != heads_map.end() )
{
heads_to_request = it->second;
}
}
logger_->debug( "{}: heads to request={}", __func__, heads_to_request.size() );
lock.unlock();
if ( !heads_to_request.empty() )
{
RequestHeadCids( heads_to_request );
}
else
{
logger_->error( "{}: no heads available to request", __func__ );
}
}
void ValidatorRegistry::PersistLocalState( const std::string &cid ) const
{
logger_->trace( "{}: entry cid={}", __func__, cid );
crdt::GlobalDB::Buffer registry_cid_key;
registry_cid_key.put( std::string( RegistryCidKey() ) );
crdt::GlobalDB::Buffer registry_cid;
registry_cid.put( cid );
(void)db_->GetDataStore()->put( registry_cid_key, registry_cid );
logger_->debug( "{}: persisted CID", __func__ );
}
void ValidatorRegistry::RequestHeadCids( const std::set<CID> &cids )
{
logger_->trace( "{}: entry count={}", __func__, cids.size() );
const char *func = __func__;
if ( cids.empty() )
{
logger_->error( "{}: empty CID set", __func__ );
return;
}
struct RequestState
{
std::atomic<size_t> remaining;
std::atomic<bool> success_reported{ false };
explicit RequestState( size_t remaining_count ) : remaining( remaining_count ) {}
};
auto state = std::make_shared<RequestState>( cids.size() );
for ( const auto &cid : cids )
{
auto cid_string = cid.toString();
if ( !cid_string.has_value() )
{
logger_->error( "{}: failed to convert CID to string", __func__ );
if ( state->remaining.fetch_sub( 1 ) == 1 && !state->success_reported.load() )
{
NotifyInitialized( false );
}
continue;
}
logger_->debug( "{}: requesting CID {}", __func__, cid_string.value() );
request_block_by_cid_(
cid_string.value(),
[weak_self = weak_from_this(), state, func]( outcome::result<std::string> result )
{
if ( auto self = weak_self.lock() )
{
if ( !result.has_error() )
{
if ( !state->success_reported.exchange( true ) )
{
self->logger_->info( "{}: head request succeeded", func );
self->NotifyInitialized( true );
}
}
if ( state->remaining.fetch_sub( 1 ) == 1 && !state->success_reported.load() )
{
self->logger_->error( "{}: all head requests failed", func );
self->NotifyInitialized( false );
}
}
} );
}
}
void ValidatorRegistry::NotifyInitialized( bool success ) const
{
logger_->trace( "{}: entry success={}", __func__, success );
if ( init_callback_ )
{
init_callback_( success );
logger_->debug( "{}: callback dispatched", __func__ );
}
else
{
logger_->debug( "{}: no callback registered", __func__ );
}
}
}
Updated on 2026-06-05 at 17:22:19 -0700