Skip to content

blockchain/Consensus.cpp

Consensus proposal/vote/certificate helpers. More...

Namespaces

Name
sgns

Functions

Name
base::Logger ConsensusManagerLogger()

Detailed Description

Consensus proposal/vote/certificate helpers.

Date: 2025-10-16 Henrique A. Klein ([email protected])

Functions Documentation

function ConsensusManagerLogger

base::Logger ConsensusManagerLogger()

Source code

#include "blockchain/Consensus.hpp"

#include <algorithm>
#include <chrono>
#include <set>
#include <system_error>
#include <boost/format.hpp>

#include <gsl/span>

#include "base/hexutil.hpp"
#include "base/sgns_version.hpp"
#include "crypto/hasher/hasher_impl.hpp"
#include "account/GeniusAccount.hpp"
#include "blockchain/ConsensusAuth.hpp"

namespace sgns
{

    base::Logger ConsensusManagerLogger()
    {
        // Always call base::createLogger to get the current logger
        // This will return existing logger or create new one as needed
        return base::createLogger( "ConsensusManager" );
    }

    std::shared_ptr<ConsensusManager> ConsensusManager::New( std::shared_ptr<ValidatorRegistry>         registry,
                                                             std::shared_ptr<crdt::GlobalDB>            db,
                                                             std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub,
                                                             Signer                                     signer,
                                                             std::string                                address,
                                                             std::string consensus_topic )
    {
        if ( !registry )
        {
            ConsensusManagerLogger()->error( "{}: Failed to create ConsensusManager: registry is null", __func__ );
            return nullptr;
        }
        if ( !db )
        {
            ConsensusManagerLogger()->error( "{}: Failed to create ConsensusManager: db is null", __func__ );
            return nullptr;
        }
        if ( !pubsub )
        {
            ConsensusManagerLogger()->error( "{}: Failed to create ConsensusManager: pubsub is null", __func__ );
            return nullptr;
        }
        if ( !signer )
        {
            ConsensusManagerLogger()->error( "{}: Failed to create ConsensusManager: signer is null", __func__ );
            return nullptr;
        }
        if ( address.empty() )
        {
            ConsensusManagerLogger()->error( "{}: Failed to create ConsensusManager: address is empty", __func__ );
            return nullptr;
        }

        auto instance = std::shared_ptr<ConsensusManager>( new ConsensusManager( std::move( registry ),
                                                                                 std::move( db ),
                                                                                 std::move( pubsub ),
                                                                                 std::move( signer ),
                                                                                 address,
                                                                                 consensus_topic ) );
        instance->certificate_work_journal_ = instance->db_->GetWorkJournal();

        if ( !instance->certificate_work_journal_ )
        {
            ConsensusManagerLogger()->error( "{}: Failed to create ConsensusManager: crdt work journal is empty",
                                             __func__ );
            return nullptr;
        }

        instance->consensus_subs_future_ = std::move( instance->pubsub_->Subscribe(
            instance->consensus_messages_topic_,
            [weakptr( std::weak_ptr<ConsensusManager>( instance ) )](
                boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
            {
                if ( auto self = weakptr.lock() )
                {
                    ConsensusManagerLogger()->trace( "{}: Received Consensus Message on topic {}",
                                                     __func__,
                                                     self->consensus_messages_topic_ );
                    self->OnConsensusMessage( message );
                }
            } ) );
        ConsensusManagerLogger()->debug( "{}: Subscribed to Consensus topic {}",
                                         __func__,
                                         instance->consensus_messages_topic_ );
        instance->StartRoundTimer();
        if ( !instance->RegisterCertificateFilter() )
        {
            ConsensusManagerLogger()->error( "{}: Failed to register certificate filter", __func__ );
        }
        instance->RecoverPendingCertificateWork();

        return instance;
    }

    ConsensusManager::ConsensusManager( std::shared_ptr<ValidatorRegistry>         registry,
                                        std::shared_ptr<crdt::GlobalDB>            db,
                                        std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub,
                                        Signer                                     signer,
                                        std::string                                address,
                                        std::string                                consensus_topic ) :
        registry_( std::move( registry ) ), //
        db_( std::move( db ) ),             //
        pubsub_( std::move( pubsub ) ),     //
        signer_( std::move( signer ) ),     //
        account_address_( address ),        //
        consensus_messages_topic_( std::string( CONSENSUS_CHANNEL_PREFIX ) + sgns::version::GetNetAndVersionAppendix() +
                                   consensus_topic ),
        consensus_datastore_topic_( consensus_messages_topic_ + "#datastore" )
    {
    }

    ConsensusManager::~ConsensusManager()
    {
        stop_timer_.store( true );
        timer_cv_.notify_all();
        ConsensusManagerLogger()->debug( "{}: Finished shutting down ConsensusManager", __func__ );
    }

    void ConsensusManager::Close()
    {
        stop_timer_.store( true );
        timer_cv_.notify_all();
        if ( round_timer_.joinable() )
        {
            round_timer_.join();
        }
    }

    void ConsensusManager::StartRoundTimer()
    {
        if ( round_timer_.joinable() )
        {
            return;
        }
        if ( stop_timer_.load() )
        {
            return;
        }

        std::weak_ptr<ConsensusManager> weak_self = shared_from_this();
        round_timer_                              = std::thread(
            [weak_self]()
            {
                constexpr auto min_interval = std::chrono::milliseconds( 500 );
                while ( true )
                {
                    auto self = weak_self.lock();
                    if ( !self )
                    {
                        return;
                    }

                    std::unique_lock<std::mutex> lock( self->timer_mutex_ );
                    auto                         interval = self->round_duration_ / 2;
                    if ( interval.count() <= 0 )
                    {
                        interval = DEFAULT_ROUND_DURATION / 2;
                    }
                    if ( interval < min_interval )
                    {
                        interval = min_interval;
                    }
                    if ( self->certificates_pending_.load() )
                    {
                        // Work is pending: run on cadence, only interrupt for shutdown.
                        self->timer_cv_.wait_for( lock, interval, [self]() { return self->stop_timer_.load(); } );
                    }
                    else
                    {
                        // No pending work: wait up to interval, but wake immediately when new work appears.
                        self->timer_cv_.wait_for(
                            lock,
                            interval,
                            [self]() { return self->stop_timer_.load() || self->certificates_pending_.load(); } );
                    }
                    if ( self->stop_timer_.load() )
                    {
                        return;
                    }
                    lock.unlock();
                    if ( self->certificates_pending_.load() )
                    {
                        self->ProcessCertificates();
                        self->UpdateCertificatesPending();
                    }
                    // Keep replaying unfinished certificate work while the node is running.
                    self->RecoverPendingCertificateWork();
                }
            } );
    }

    outcome::result<void> ConsensusManager::Publish( const ConsensusMessage &message )
    {
        std::vector<uint8_t> serialized_proto( message.ByteSizeLong() );
        if ( !message.SerializeToArray( serialized_proto.data(), serialized_proto.size() ) )
        {
            ConsensusManagerLogger()->error( "{}: Failed to serialize consensus message", __func__ );
            return outcome::failure( std::errc::invalid_argument );
        }

        ConsensusManagerLogger()->debug( "{}: Sending consensus packet to {}", __func__, consensus_messages_topic_ );
        pubsub_->Publish( consensus_messages_topic_, serialized_proto );
        ConsensusManagerLogger()->debug( "{}: Consensus packet published (bytes={})",
                                         __func__,
                                         serialized_proto.size() );

        return outcome::success();
    }

    bool ConsensusManager::RegisterSubjectHandler( std::string_view subject_type, SubjectHandler handler )
    {
        if ( !handler )
        {
            ConsensusManagerLogger()->error( "{}: ignored empty handler subject_type={}", __func__, subject_type );
            return false;
        }
        auto type_hash = ComputeSubjectTypeHash( subject_type );
        if ( type_hash.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: ignored invalid handler subject_type={}", __func__, subject_type );
            return false;
        }
        ConsensusManagerLogger()->debug( "{}: Registering subject handler subject_type={}", __func__, subject_type );
        std::unique_lock lock( subject_handlers_mutex_ );
        subject_handlers_[type_hash.value()] = std::move( handler );
        return true;
    }

    void ConsensusManager::UnregisterSubjectHandler( std::string_view subject_type )
    {
        ConsensusManagerLogger()->debug( "{}: Removing Subject handler with subject_type={}", __func__, subject_type );
        auto type_hash = ComputeSubjectTypeHash( subject_type );
        if ( type_hash.has_error() )
        {
            return;
        }
        std::unique_lock lock( subject_handlers_mutex_ );
        subject_handlers_.erase( type_hash.value() );
    }

    bool ConsensusManager::RegisterCertificateHandler( std::string_view          subject_type,
                                                       CertificateSubjectHandler handler )
    {
        if ( !handler )
        {
            ConsensusManagerLogger()->error( "{}: ignored empty certificate handler subject_type={}",
                                             __func__,
                                             subject_type );
            return false;
        }
        auto type_hash = ComputeSubjectTypeHash( subject_type );
        if ( type_hash.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: ignored invalid certificate handler subject_type={}",
                                             __func__,
                                             subject_type );
            return false;
        }
        ConsensusManagerLogger()->debug( "{}: Registering certificate handler subject_type={}",
                                         __func__,
                                         subject_type );
        std::unique_lock lock( certificate_handlers_mutex_ );
        certificate_subject_handlers_[type_hash.value()] = std::move( handler );
        return true;
    }

    void ConsensusManager::UnregisterCertificateHandler( std::string_view subject_type )
    {
        ConsensusManagerLogger()->debug( "{}: Removing Certificate handler with subject_type={}",
                                         __func__,
                                         subject_type );
        auto type_hash = ComputeSubjectTypeHash( subject_type );
        if ( type_hash.has_error() )
        {
            return;
        }
        std::unique_lock lock( certificate_handlers_mutex_ );
        certificate_subject_handlers_.erase( type_hash.value() );
    }

    bool ConsensusManager::RegisterProposalCleanupHandler( std::string_view       subject_type,
                                                           ProposalCleanupHandler handler )
    {
        if ( !handler )
        {
            ConsensusManagerLogger()->error( "{}: ignored empty cleanup handler subject_type={}",
                                             __func__,
                                             subject_type );
            return false;
        }
        auto type_hash = ComputeSubjectTypeHash( subject_type );
        if ( type_hash.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: ignored invalid cleanup handler subject_type={}",
                                             __func__,
                                             subject_type );
            return false;
        }
        ConsensusManagerLogger()->debug( "{}: Registering cleanup handler subject_type={}", __func__, subject_type );
        std::unique_lock lock( cleanup_handlers_mutex_ );
        proposal_cleanup_handlers_[type_hash.value()].push_back( std::move( handler ) );
        return true;
    }

    void ConsensusManager::UnregisterProposalCleanupHandler( std::string_view subject_type )
    {
        ConsensusManagerLogger()->debug( "{}: Removing cleanup handler with subject_type={}", __func__, subject_type );
        auto type_hash = ComputeSubjectTypeHash( subject_type );
        if ( type_hash.has_error() )
        {
            return;
        }
        std::unique_lock lock( cleanup_handlers_mutex_ );
        proposal_cleanup_handlers_.erase( type_hash.value() );
    }

    void ConsensusManager::RegisterSlotKeyHandler( std::string_view subject_type, SlotKeyHandler handler )
    {
        if ( !handler )
        {
            ConsensusManagerLogger()->error( "{}: ignored empty slot key handler subject_type={}",
                                             __func__, subject_type );
            return;
        }
        auto type_hash = ComputeSubjectTypeHash( subject_type );
        if ( type_hash.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: ignored invalid slot key handler subject_type={}",
                                             __func__, subject_type );
            return;
        }
        ConsensusManagerLogger()->debug( "{}: Registering slot key handler subject_type={}", __func__, subject_type );
        std::unique_lock lock( slot_key_handlers_mutex_ );
        slot_key_handlers_[type_hash.value()] = std::move( handler );
    }

    void ConsensusManager::UnregisterSlotKeyHandler( std::string_view subject_type )
    {
        ConsensusManagerLogger()->debug( "{}: Removing slot key handler subject_type={}", __func__, subject_type );
        auto type_hash = ComputeSubjectTypeHash( subject_type );
        if ( type_hash.has_error() )
        {
            return;
        }
        std::unique_lock lock( slot_key_handlers_mutex_ );
        slot_key_handlers_.erase( type_hash.value() );
    }

    void ConsensusManager::FireProposalCleanupCallbacks( const Proposal &proposal )
    {
        auto subject_hash = GetSubjectHash( proposal.subject() );
        if ( subject_hash.has_error() )
        {
            return;
        }
        auto nonce_payload = DecodeNonceSubject( proposal.subject() );
        if ( nonce_payload.has_error() )
        {
            return;
        }
        auto tx_hash = nonce_payload.value().tx_hash();
        if ( tx_hash.empty() )
        {
            return;
        }

        std::vector<ProposalCleanupHandler> handlers_copy;
        {
            std::shared_lock lock( cleanup_handlers_mutex_ );
            auto             it = proposal_cleanup_handlers_.find( proposal.subject().subject_type_hash().hash() );
            if ( it != proposal_cleanup_handlers_.end() )
            {
                handlers_copy = it->second;
            }
        }
        for ( auto &handler : handlers_copy )
        {
            handler( tx_hash );
        }
    }

    void ConsensusManager::ConfigureTimestampWindow( std::chrono::milliseconds window )
    {
        if ( window.count() <= 0 )
        {
            ConsensusManagerLogger()->warn( "{}: using default window", __func__ );
            timestamp_window_ = DEFAULT_TIMESTAMP_WINDOW;
            return;
        }
        timestamp_window_ = window;
    }

    void ConsensusManager::ConfigureRoundDuration( std::chrono::milliseconds duration )
    {
        if ( duration.count() <= 0 )
        {
            ConsensusManagerLogger()->warn( "{}: using default round duration", __func__ );
            round_duration_ = DEFAULT_ROUND_DURATION;
            return;
        }
        round_duration_ = duration;
    }

    void ConsensusManager::ConfigureRoundSkew( std::chrono::milliseconds skew )
    {
        if ( skew.count() < 0 )
        {
            ConsensusManagerLogger()->warn( "{}: using default round skew", __func__ );
            round_skew_ = DEFAULT_ROUND_SKEW;
            return;
        }
        round_skew_ = skew;
    }

    void ConsensusManager::ConfigureCertificateDelay( std::chrono::milliseconds delay )
    {
        if ( delay.count() < 0 )
        {
            ConsensusManagerLogger()->warn( "{}: using zero delay", __func__ );
            certificate_delay_ = std::chrono::milliseconds( 0 );
            return;
        }
        certificate_delay_ = delay;
    }

    bool ConsensusManager::IsTimestampSane( uint64_t timestamp_ms ) const
    {
        if ( timestamp_ms == 0 )
        {
            return false;
        }
        const auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                std::chrono::system_clock::now().time_since_epoch() )
                                .count();
        const auto window_ms = timestamp_window_.count();
        if ( now_ms < 0 || window_ms < 0 )
        {
            return false;
        }

        const auto now_u64    = static_cast<uint64_t>( now_ms );
        const auto window_u64 = static_cast<uint64_t>( window_ms );
        const auto min_ts     = ( now_u64 > window_u64 ) ? ( now_u64 - window_u64 ) : 0ULL;
        const auto max_ts     = ( std::numeric_limits<uint64_t>::max() - now_u64 < window_u64 )
                                    ? std::numeric_limits<uint64_t>::max()
                                    : now_u64 + window_u64;
        return ( timestamp_ms >= min_ts ) && ( timestamp_ms <= max_ts );
    }

    uint64_t ConsensusManager::GetCurrentRound( uint64_t proposal_ts_ms ) const
    {
        if ( proposal_ts_ms == 0 || round_duration_.count() <= 0 )
        {
            return 0;
        }
        const auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                std::chrono::system_clock::now().time_since_epoch() )
                                .count();
        const auto elapsed = static_cast<int64_t>( now_ms ) - static_cast<int64_t>( proposal_ts_ms );
        if ( elapsed <= 0 )
        {
            return 0;
        }
        const auto skew_ms = static_cast<int64_t>( round_skew_.count() );
        if ( elapsed <= skew_ms )
        {
            return 0;
        }
        const auto round_ms = static_cast<int64_t>( round_duration_.count() );
        auto       round    = static_cast<uint64_t>( ( elapsed - skew_ms ) / round_ms );
        ConsensusManagerLogger()->debug( "{}: Returning round={}", __func__, round );
        return round;
    }

    std::vector<std::string> ConsensusManager::GetOrderedActiveValidators(
        const ValidatorRegistry::Registry &registry ) const
    {
        std::vector<std::string> validators;
        validators.reserve( registry.validators_size() );
        for ( const auto &entry : registry.validators() )
        {
            if ( entry.status() == ValidatorRegistry::Status::ACTIVE )
            {
                validators.push_back( entry.validator_id() );
            }
        }
        std::sort( validators.begin(), validators.end() );
        ConsensusManagerLogger()->trace( "{}: Returning validators with size ={}", __func__, validators.size() );
        return validators;
    }

    bool ConsensusManager::IsCurrentAggregator( const Proposal                    &proposal,
                                                const ValidatorRegistry::Registry &registry ) const
    {
        ConsensusManagerLogger()->trace( "{}: Checking if is current aggregator for proposal", __func__ );
        auto ordered = GetOrderedActiveValidators( registry );
        if ( ordered.empty() )
        {
            return false;
        }

        sgns::crypto::HasherImpl hasher;
        auto                     hash = hasher.sha2_256( proposal.proposal_id().data(), proposal.proposal_id().size() );
        uint64_t                 base_index = 0;
        for ( size_t i = 0; i < sizeof( uint64_t ) && i < hash.size(); ++i )
        {
            base_index = ( base_index << 8 ) | hash[i];
        }
        base_index = base_index % ordered.size();

        const auto round = GetCurrentRound( proposal.timestamp() );
        const auto index = ( base_index + round ) % ordered.size();

        return ordered[index] == account_address_;
    }

    outcome::result<std::string> ConsensusManager::GetSubjectHash( const Subject &subject )
    {
        if ( SubjectTypeMatches( subject, NONCE_SUBJECT_TYPE ) )
        {
            auto payload = DecodeNonceSubject( subject );
            if ( payload.has_error() || payload.value().tx_hash().empty() )
            {
                return outcome::failure( std::errc::invalid_argument );
            }
            return payload.value().tx_hash();
        }
        if ( SubjectTypeMatches( subject, TASK_RESULT_SUBJECT_TYPE ) )
        {
            auto payload = DecodeTaskResultSubject( subject );
            if ( payload.has_error() || payload.value().task_result_hash().empty() )
            {
                return outcome::failure( std::errc::invalid_argument );
            }
            return payload.value().task_result_hash();
        }
        if ( SubjectTypeMatches( subject, REGISTRY_BATCH_SUBJECT_TYPE ) )
        {
            auto payload = DecodeRegistryBatchSubject( subject );
            if ( payload.has_error() || payload.value().batch_root().empty() )
            {
                return outcome::failure( std::errc::invalid_argument );
            }
            return std::string( payload.value().batch_root() );
        }
        return ComputeSubjectId( subject );
    }

    void ConsensusManager::ContinueProposalAfterSubject( const Proposal &proposal )
    {
        ConsensusManagerLogger()->debug( "{}: Continuing proposal: hash {}, id {}",
                                         __func__,
                                         GetPrintableSubjectHash( proposal.subject() ),
                                         proposal.proposal_id().substr( 0, 8 ) );
        const auto slot_key    = GetSlotKey( proposal );
        bool       should_vote = false;

        ConsensusManagerLogger()->debug( "{}: Slot key acquired: hash {}, id {}, slot key {}",
                                         __func__,
                                         GetPrintableSubjectHash( proposal.subject() ),
                                         proposal.proposal_id().substr( 0, 8 ),
                                         slot_key );
        {
            std::lock_guard lock( proposals_mutex_ );
            if ( proposals_.find( proposal.proposal_id() ) == proposals_.end() )
            {
                ConsensusManagerLogger()->debug(
                    "{}: No proposal state found. Creating... : hash {}, id {}, slot key {}",
                    __func__,
                    GetPrintableSubjectHash( proposal.subject() ),
                    proposal.proposal_id().substr( 0, 8 ),
                    slot_key );
                ProposalState state;
                state.proposal = proposal;
                state.slot_key = slot_key;
                proposals_.emplace( proposal.proposal_id(), std::move( state ) );
            }

            auto &slot_state = slot_states_[slot_key];
            if ( slot_state.best_proposal_id.empty() )
            {
                ConsensusManagerLogger()->debug( "{}: Configuring best proposal for hash {}, id={}, slot key {}",
                                                 __func__,
                                                 GetPrintableSubjectHash( proposal.subject() ),
                                                 proposal.proposal_id().substr( 0, 8 ),
                                                 slot_key );
                slot_state.best_proposal_id = proposal.proposal_id();
                auto nonce_payload          = DecodeNonceSubject( proposal.subject() );
                if ( nonce_payload.has_value() )
                {
                    slot_state.best_tx_hash = nonce_payload.value().tx_hash();
                }
            }
            else
            {
                const auto &current = proposals_.at( slot_state.best_proposal_id ).proposal;
                ConsensusManagerLogger()->debug(
                    "{}: Already have a best proposal for hash {}, id={}, slot key {}. Seeing if {} is better ",
                    __func__,
                    GetPrintableSubjectHash( current.subject() ),
                    current.proposal_id().substr( 0, 8 ),
                    slot_key,
                    proposal.proposal_id().substr( 0, 8 ) );
                if ( IsBetterProposal( proposal, current ) )
                {
                    ConsensusManagerLogger()->debug( "{}: Better proposal for hash {}, id={}, slot key {}. ",
                                                     __func__,
                                                     GetPrintableSubjectHash( proposal.subject() ),
                                                     proposal.proposal_id().substr( 0, 8 ),
                                                     slot_key );
                    slot_state.best_proposal_id = proposal.proposal_id();
                    auto nonce_payload          = DecodeNonceSubject( proposal.subject() );
                    if ( nonce_payload.has_value() )
                    {
                        slot_state.best_tx_hash = nonce_payload.value().tx_hash();
                    }
                }
            }

            if ( slot_state.best_proposal_id == proposal.proposal_id() && !slot_state.voted )
            {
                ConsensusManagerLogger()->debug(
                    "{}: My proposal for hash {}, id={}, slot key {} is better so let's vote on it. ",
                    __func__,
                    GetPrintableSubjectHash( proposal.subject() ),
                    proposal.proposal_id().substr( 0, 8 ),
                    slot_key );
                slot_state.voted = true;
                should_vote      = true;
            }
        }

        auto pending_votes = TakePendingVotes( proposal.proposal_id() );
        for ( const auto &vote : pending_votes )
        {
            HandleVote( vote );
        }

        if ( should_vote )
        {
            auto vote_result = CreateVote( proposal.proposal_id(), account_address_, true, signer_ );
            if ( vote_result.has_value() )
            {
                (void)SubmitVote( vote_result.value() );
                ConsensusManagerLogger()->debug( "{}: self-vote submitted for hash {}, id={}, slot key {}",
                                                 __func__,
                                                 GetPrintableSubjectHash( proposal.subject() ),
                                                 proposal.proposal_id().substr( 0, 8 ),
                                                 slot_key );
            }
            else
            {
                ConsensusManagerLogger()->error( "{}: self-vote failed for hash {}, id={}, slot key {} error={}",
                                                 __func__,
                                                 GetPrintableSubjectHash( proposal.subject() ),
                                                 proposal.proposal_id().substr( 0, 8 ),
                                                 slot_key,
                                                 vote_result.error().message() );
            }
        }
    }

    void ConsensusManager::AddPendingProposal( const Proposal &proposal, const std::string &subject_hash )
    {
        std::lock_guard lock( proposals_mutex_ );
        if ( pending_proposals_.find( proposal.proposal_id() ) != pending_proposals_.end() )
        {
            ConsensusManagerLogger()->error(
                "{}: Failed adding pending proposal for {}: already have a proposal with id {}",
                __func__,
                subject_hash.substr( 0, 8 ),
                proposal.proposal_id().substr( 0, 8 ) );
            return;
        }
        ConsensusManagerLogger()->debug( "{}: Adding pending proposal for {}: proposal with id {}",
                                         __func__,
                                         subject_hash.substr( 0, 8 ),
                                         proposal.proposal_id().substr( 0, 8 ) );
        pending_proposals_.emplace( proposal.proposal_id(), proposal );
        pending_by_subject_hash_[subject_hash].push_back( proposal.proposal_id() );
    }

    std::vector<ConsensusManager::Proposal> ConsensusManager::TakePendingProposals( const std::string &subject_hash )
    {
        std::vector<Proposal> result;
        std::lock_guard       lock( proposals_mutex_ );
        auto                  it = pending_by_subject_hash_.find( subject_hash );
        if ( it == pending_by_subject_hash_.end() )
        {
            ConsensusManagerLogger()->trace( "{}: No pending proposals for {}", __func__, subject_hash.substr( 0, 8 ) );
            return result;
        }
        for ( const auto &proposal_id : it->second )
        {
            auto prop_it = pending_proposals_.find( proposal_id );
            if ( prop_it != pending_proposals_.end() )
            {
                result.push_back( prop_it->second );
                pending_proposals_.erase( prop_it );
            }
        }
        ConsensusManagerLogger()->debug( "{}: Taking pending proposals for {}", __func__, subject_hash.substr( 0, 8 ) );
        pending_by_subject_hash_.erase( it );
        return result;
    }

    void ConsensusManager::AddPendingVote( const Vote &vote )
    {
        std::lock_guard lock( proposals_mutex_ );
        pending_votes_[vote.proposal_id()].push_back( vote );
    }

    std::vector<ConsensusManager::Vote> ConsensusManager::TakePendingVotes( const std::string &proposal_id )
    {
        std::vector<Vote> result;
        std::lock_guard   lock( proposals_mutex_ );
        auto              it = pending_votes_.find( proposal_id );
        if ( it == pending_votes_.end() )
        {
            return result;
        }
        result = std::move( it->second );
        pending_votes_.erase( it );
        return result;
    }

    outcome::result<ConsensusManager::Proposal> ConsensusManager::CreateProposal( const Subject     &subject,
                                                                                  const std::string &proposer_id,
                                                                                  const std::string &registry_cid,
                                                                                  uint64_t           registry_epoch )
    {
        return CreateProposal( subject, proposer_id, registry_cid, registry_epoch, signer_ );
    }

    outcome::result<ConsensusManager::Proposal> ConsensusManager::CreateProposal( const Subject     &subject,
                                                                                  const std::string &proposer_id,
                                                                                  const std::string &registry_cid,
                                                                                  uint64_t           registry_epoch,
                                                                                  Signer             sign )
    {
        ConsensusManagerLogger()->trace( "{}: called by {} with hash {}, registry CID {} and epoch {}",
                                         __func__,
                                         proposer_id.substr( 0, 8 ),
                                         GetPrintableSubjectHash( subject ),
                                         registry_cid,
                                         registry_epoch );
        if ( !sign )
        {
            ConsensusManagerLogger()->error( "{}: failed for hash {}: signer is empty",
                                             __func__,
                                             GetPrintableSubjectHash( subject ) );
            return outcome::failure( std::errc::invalid_argument );
        }

        if ( !ValidateSubject( subject ) )
        {
            ConsensusManagerLogger()->error( "{}: failed for hash {}: subject validation failed",
                                             __func__,
                                             GetPrintableSubjectHash( subject ) );
            return outcome::failure( std::errc::invalid_argument );
        }

        Proposal proposal;
        *proposal.mutable_subject() = subject;
        proposal.set_proposer_id( proposer_id );
        proposal.set_registry_cid( registry_cid );
        proposal.set_registry_epoch( registry_epoch );
        proposal.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );

        proposal.set_proposal_id( CreateProposalId( proposal ) );
        auto signing_bytes = ProposalSigningBytes( proposal );
        if ( signing_bytes.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: signing bytes error={}",
                                             __func__,
                                             signing_bytes.error().message() );
            return outcome::failure( signing_bytes.error() );
        }
        ConsensusManagerLogger()->debug( "{}: Creating proposal ID {} for hash {}",
                                         __func__,
                                         proposal.proposal_id().substr( 0, 8 ),
                                         GetPrintableSubjectHash( subject ) );
        BOOST_OUTCOME_TRY( auto &&signature, sign( signing_bytes.value() ) );
        proposal.set_signature( signature.data(), signature.size() );

        ConsensusManagerLogger()->debug( "{}: success for hash {} proposal_id={}",
                                         __func__,
                                         GetPrintableSubjectHash( subject ),
                                         proposal.proposal_id().substr( 0, 8 ) );
        return proposal;
    }

    outcome::result<ConsensusManager::Vote> ConsensusManager::CreateVote( const std::string &proposal_id,
                                                                          const std::string &voter_id,
                                                                          bool               approve,
                                                                          Signer             sign )
    {
        ConsensusManagerLogger()->trace( "{}: called by {}: proposal_id={} approve={}",
                                         __func__,
                                         voter_id.substr( 0, 8 ),
                                         proposal_id.substr( 0, 8 ),
                                         approve );
        if ( !sign )
        {
            ConsensusManagerLogger()->error( "{}: failed: signer is empty", __func__ );
            return outcome::failure( std::errc::invalid_argument );
        }

        Vote vote;
        vote.set_proposal_id( proposal_id );
        vote.set_voter_id( voter_id );
        vote.set_approve( approve );
        vote.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );

        auto signing_bytes = VoteSigningBytes( vote );
        if ( signing_bytes.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: signing bytes error={}",
                                             __func__,
                                             signing_bytes.error().message() );
            return outcome::failure( signing_bytes.error() );
        }

        BOOST_OUTCOME_TRY( auto &&signature, sign( signing_bytes.value() ) );
        vote.set_signature( signature.data(), signature.size() );

        ConsensusManagerLogger()->debug( "{}: {} voted for proposal_id={}",
                                         __func__,
                                         voter_id.substr( 0, 8 ),
                                         proposal_id.substr( 0, 8 ) );
        return vote;
    }

    outcome::result<ConsensusManager::VoteBundle> ConsensusManager::CreateVoteBundle( const std::string &proposal_id,
                                                                                      const std::string &aggregator_id,
                                                                                      const std::vector<Vote> &votes,
                                                                                      Signer                   sign )
    {
        ConsensusManagerLogger()->trace( "{}: called by {}: proposal_id={} votes={}",
                                         __func__,
                                         aggregator_id.substr( 0, 8 ),
                                         proposal_id.substr( 0, 8 ),
                                         votes.size() );
        if ( !sign )
        {
            ConsensusManagerLogger()->error( "{}: failed: signer is empty", __func__ );
            return outcome::failure( std::errc::invalid_argument );
        }

        VoteBundle bundle;
        bundle.set_proposal_id( proposal_id );
        bundle.set_aggregator_id( aggregator_id );
        bundle.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );
        for ( const auto &vote : votes )
        {
            *bundle.add_votes() = vote;
        }

        auto signing_bytes = VoteBundleSigningBytes( bundle );
        if ( signing_bytes.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: signing bytes error={}",
                                             __func__,
                                             signing_bytes.error().message() );
            return outcome::failure( signing_bytes.error() );
        }

        BOOST_OUTCOME_TRY( auto &&signature, sign( signing_bytes.value() ) );
        bundle.set_signature( signature.data(), signature.size() );

        ConsensusManagerLogger()->debug(
            "{}: Vote bundle created successfully by {}: proposal_id={} number of votes={}",
            __func__,
            aggregator_id.substr( 0, 8 ),
            proposal_id.substr( 0, 8 ),
            votes.size() );
        return bundle;
    }

    outcome::result<ConsensusManager::Certificate> ConsensusManager::CreateCertificate( const Proposal &proposal,
                                                                                        const std::vector<Vote> &votes )
    {
        ConsensusManagerLogger()->trace(
            "{}: Creating certificate for hash {}: proposal_id={} number of votes={} registry CID={}, epoch={}",
            __func__,
            GetPrintableSubjectHash( proposal.subject() ),
            proposal.proposal_id().substr( 0, 8 ),
            votes.size(),
            proposal.registry_cid(),
            proposal.registry_epoch() );
        auto tally_result = TallyVotes( proposal, votes );
        if ( tally_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: tally error={}", __func__, tally_result.error().message() );
            return outcome::failure( tally_result.error() );
        }

        const auto &tally = tally_result.value();
        Certificate cert;
        cert.set_proposal_id( proposal.proposal_id() );
        cert.set_registry_cid( proposal.registry_cid() );
        cert.set_registry_epoch( proposal.registry_epoch() );
        cert.set_total_weight( tally.total_weight );
        cert.set_approved_weight( tally.approved_weight );
        uint64_t max_vote_ts = 0;
        for ( const auto &vote : votes )
        {
            if ( vote.timestamp() > max_vote_ts )
            {
                max_vote_ts = vote.timestamp();
            }
        }
        if ( max_vote_ts == 0 )
        {
            max_vote_ts = std::chrono::duration_cast<std::chrono::milliseconds>(
                              std::chrono::system_clock::now().time_since_epoch() )
                              .count();
        }
        cert.set_timestamp( max_vote_ts );
        for ( const auto &vote : votes )
        {
            *cert.add_votes() = vote;
        }
        *cert.mutable_proposal() = proposal;

        ConsensusManagerLogger()->debug( "{}: Success creating certificate for hash {} proposal_id={}",
                                         __func__,
                                         GetPrintableSubjectHash( proposal.subject() ),
                                         proposal.proposal_id().substr( 0, 8 ) );
        return cert;
    }

    outcome::result<ConsensusManager::QuorumTally> ConsensusManager::TallyVotes(
        const Proposal                    &proposal,
        const std::vector<Vote>           &votes,
        const ValidatorRegistry::Registry &registry,
        const std::string                 &registry_cid ) const
    {
        if ( !proposal.registry_cid().empty() && !registry_cid.empty() && proposal.registry_cid() != registry_cid )
        {
            ConsensusManagerLogger()->error(
                "{}: failed: registry cid mismatch hash {}, proposal CID ={} registry CID={}",
                __func__,
                GetPrintableSubjectHash( proposal.subject() ),
                proposal.registry_cid(),
                registry_cid );
            return outcome::failure( std::errc::invalid_argument );
        }
        if ( proposal.registry_epoch() != registry.epoch() )
        {
            ConsensusManagerLogger()->error(
                "{}: failed: registry epoch mismatch hash {}, proposal Epoch={} registry Epoch={}",
                __func__,
                GetPrintableSubjectHash( proposal.subject() ),
                proposal.registry_epoch(),
                registry.epoch() );
            return outcome::failure( std::errc::invalid_argument );
        }

        uint64_t                        total_weight    = ValidatorRegistry::TotalWeight( registry );
        uint64_t                        approved_weight = 0;
        std::unordered_set<std::string> seen;

        for ( const auto &vote : votes )
        {
            ConsensusManagerLogger()->trace( "{}: processing vote for hash {}: voter_id={} approve={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             vote.voter_id().substr( 0, 8 ),
                                             vote.approve() );
            if ( vote.proposal_id() != proposal.proposal_id() )
            {
                continue;
            }
            if ( !seen.insert( vote.voter_id() ).second )
            {
                continue;
            }

            const auto *validator = ValidatorRegistry::FindValidator( registry, vote.voter_id() );
            if ( !validator || validator->status() != ValidatorRegistry::Status::ACTIVE )
            {
                ConsensusManagerLogger()->debug( "{}: processing vote for hash {}: voter_id={} approve={}",
                                                 __func__,
                                                 GetPrintableSubjectHash( proposal.subject() ),
                                                 vote.voter_id().substr( 0, 8 ),
                                                 vote.approve() );
                continue;
            }

            auto signing_bytes = VoteSigningBytes( vote );
            if ( signing_bytes.has_error() )
            {
                continue;
            }

            if ( !GeniusAccount::VerifySignature( vote.voter_id(), vote.signature(), signing_bytes.value() ) )
            {
                continue;
            }

            ConsensusManagerLogger()->debug( "{}: Valid voter signature for hash {}: voter_id={} approve={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             vote.voter_id().substr( 0, 8 ),
                                             vote.approve() );
            if ( vote.approve() )
            {
                ConsensusManagerLogger()->debug( "{}: Adding weight for hash {}: voter_id={} weight={}",
                                                 __func__,
                                                 GetPrintableSubjectHash( proposal.subject() ),
                                                 vote.voter_id().substr( 0, 8 ),
                                                 validator->weight() );
                approved_weight += validator->weight();
            }
        }

        QuorumTally tally;
        tally.total_weight    = total_weight;
        tally.approved_weight = approved_weight;
        tally.has_quorum      = registry_->IsQuorum( approved_weight, total_weight );
        ConsensusManagerLogger()->debug(
            "{}: Votes tallied for hash {} proposal_id={} approved_weight={} total_weight={} quorum={}",
            __func__,
            GetPrintableSubjectHash( proposal.subject() ),
            proposal.proposal_id().substr( 0, 8 ),
            approved_weight,
            total_weight,
            tally.has_quorum );
        return tally;
    }

    outcome::result<ConsensusManager::QuorumTally> ConsensusManager::TallyVotes( const Proposal          &proposal,
                                                                                 const std::vector<Vote> &votes ) const
    {
        ConsensusManagerLogger()->trace(
            "{}: Tallying with current registry for hash {}, proposal_id={} number of votes={}",
            __func__,
            GetPrintableSubjectHash( proposal.subject() ),
            proposal.proposal_id().substr( 0, 8 ),
            votes.size() );

        if ( proposal.registry_cid().empty() )
        {
            ConsensusManagerLogger()->error( "{}: failed: proposal registry CID is empty", __func__ );
            return outcome::failure( std::errc::invalid_argument );
        }

        auto registry_result = registry_->LoadRegistry( proposal.registry_cid() );
        if ( registry_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: registry load error={} cid={}",
                                             __func__,
                                             registry_result.error().message(),
                                             proposal.registry_cid() );
            return outcome::failure( registry_result.error() );
        }
        return TallyVotes( proposal, votes, registry_result.value(), proposal.registry_cid() );
    }

    outcome::result<std::vector<uint8_t>> ConsensusManager::ProposalSigningBytes( const Proposal &proposal )
    {
        ConsensusManagerLogger()->trace( "{}: called for hash {} proposal_id={}",
                                         __func__,
                                         GetPrintableSubjectHash( proposal.subject() ),
                                         proposal.proposal_id().substr( 0, 8 ) );
        return sgns::ProposalSigningBytes( proposal );
    }

    outcome::result<std::vector<uint8_t>> ConsensusManager::VoteSigningBytes( const Vote &vote )
    {
        ConsensusManagerLogger()->trace( "{}: called with voter address {} proposal_id={}",
                                         __func__,
                                         vote.voter_id().substr( 0, 8 ),
                                         vote.proposal_id() );
        return sgns::VoteSigningBytes( vote );
    }

    outcome::result<std::vector<uint8_t>> ConsensusManager::VoteBundleSigningBytes( const VoteBundle &bundle )
    {
        ConsensusManagerLogger()->trace( "{}: called proposal_id={} votes={}",
                                         __func__,
                                         bundle.proposal_id().substr( 0, 8 ),
                                         bundle.votes_size() );
        return sgns::VoteBundleSigningBytes( bundle );
    }

    outcome::result<void> ConsensusManager::SubmitProposal( const Proposal &proposal, bool self_vote )
    {
        ConsensusManagerLogger()->trace( "{}: called for hash {} proposal_id={} self_vote={}",
                                         __func__,
                                         GetPrintableSubjectHash( proposal.subject() ),
                                         proposal.proposal_id().substr( 0, 8 ),
                                         self_vote );
        const auto slot_key = GetSlotKey( proposal );
        {
            std::lock_guard lock( proposals_mutex_ );
            auto            it = proposals_.find( proposal.proposal_id() );
            if ( it == proposals_.end() )
            {
                ConsensusManagerLogger()->debug( "{}: Creating proposal state for hash {} proposal_id={}",
                                                 __func__,
                                                 GetPrintableSubjectHash( proposal.subject() ),
                                                 proposal.proposal_id().substr( 0, 8 ) );
                ProposalState state;
                state.proposal = proposal;
                state.slot_key = slot_key;
                proposals_.emplace( proposal.proposal_id(), std::move( state ) );
            }
        }

        ConsensusMessage message;
        *message.mutable_proposal() = proposal;
        auto publish_result         = Publish( message );
        if ( publish_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: publish error={}",
                                             __func__,
                                             publish_result.error().message() );
            return publish_result;
        }
        ConsensusManagerLogger()->debug( "{}: success for hash {} proposal_id={}",
                                         __func__,
                                         GetPrintableSubjectHash( proposal.subject() ),
                                         proposal.proposal_id().substr( 0, 8 ) );

        if ( self_vote )
        {
            HandleProposal( proposal );
        }

        return outcome::success();
    }

    outcome::result<void> ConsensusManager::SubmitVote( const Vote &vote, bool self_handle )
    {
        ConsensusManagerLogger()->trace( "{}: called by {} proposal_id={}",
                                         __func__,
                                         vote.voter_id().substr( 0, 8 ),
                                         vote.proposal_id().substr( 0, 8 ) );
        ConsensusMessage message;
        *message.mutable_vote() = vote;
        auto result             = Publish( message );
        if ( result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: publish error={}", __func__, result.error().message() );
            return result;
        }
        ConsensusManagerLogger()->debug( "{}: success voter_id={} proposal_id={} ",
                                         __func__,
                                         vote.voter_id().substr( 0, 8 ),
                                         vote.proposal_id().substr( 0, 8 ) );
        if ( self_handle )
        {
            HandleVote( vote );
        }
        return result;
    }

    outcome::result<void> ConsensusManager::SubmitCertificate( const Certificate &certificate )
    {
        ConsensusManagerLogger()->trace( "{}: called for hash {} and proposal_id={}",
                                         __func__,
                                         GetPrintableSubjectHash( certificate.proposal().subject() ),
                                         certificate.proposal_id().substr( 0, 8 ) );
        ConsensusMessage message;
        *message.mutable_certificate() = certificate;
        auto result                    = Publish( message );
        if ( result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: publish error={}", __func__, result.error().message() );
            return result;
        }

        auto subject_hash_result = GetSubjectHash( certificate.proposal().subject() );
        if ( subject_hash_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: subject hash {} error proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( certificate.proposal().subject() ),
                                             certificate.proposal_id().substr( 0, 8 ) );
            return outcome::failure( subject_hash_result.error() );
        }

        std::string serialized;
        if ( !certificate.SerializeToString( &serialized ) )
        {
            ConsensusManagerLogger()->error( "{}: failed: certificate serialize error", __func__ );
            return outcome::failure( std::errc::invalid_argument );
        }

        const auto             key = std::string{ CERTIFICATE_BASE_PATH_KEY } + subject_hash_result.value();
        crdt::HierarchicalKey  cert_key( key );
        crdt::GlobalDB::Buffer cert_value;
        cert_value.put( serialized );

        auto cert_put = db_->Put( cert_key, cert_value, { consensus_datastore_topic_ } );
        if ( cert_put.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed: cert put for hash {} error={}",
                                             __func__,
                                             GetPrintableSubjectHash( certificate.proposal().subject() ),
                                             cert_put.error().message() );
            return outcome::failure( cert_put.error() );
        }

        ConsensusManagerLogger()->debug( "{}: success submitting certificate for {} and proposal_id={}",
                                         __func__,
                                         GetPrintableSubjectHash( certificate.proposal().subject() ),
                                         certificate.proposal_id().substr( 0, 8 ) );
        return result;
    }

    void ConsensusManager::HandleProposal( const Proposal &proposal )
    {
        ConsensusManagerLogger()->trace( "{}: called for hash {} proposal_id={}",
                                         __func__,
                                         GetPrintableSubjectHash( proposal.subject() ),
                                         proposal.proposal_id().substr( 0, 8 ) );

        if ( !CheckProposal( proposal ) )
        {
            ConsensusManagerLogger()->error( "{}: rejected: Invalid proposal for hash {} proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             proposal.proposal_id().substr( 0, 8 ) );
            return;
        }

        if ( !IsTimestampSane( proposal.timestamp() ) )
        {
            ConsensusManagerLogger()->error( "{}: rejected: timestamp out of bounds for hash {} proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             proposal.proposal_id().substr( 0, 8 ) );
            return;
        }

        if ( proposal.registry_cid().empty() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: proposal registry CID missing for hash {}. proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             proposal.proposal_id().substr( 0, 8 ) );
            return;
        }

        auto subject_hash = GetSubjectHash( proposal.subject() );
        if ( subject_hash.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: subject hash missing proposal_id={}",
                                             __func__,
                                             proposal.proposal_id().substr( 0, 8 ) );
            return;
        }

        auto proposal_registry_result = registry_->LoadRegistry( proposal.registry_cid() );
        if ( proposal_registry_result.has_error() )
        {
            ConsensusManagerLogger()->warn(
                "{}: deferred: registry load error={} proposal={} proposal_id={} hash={}. Keeping proposal pending",
                __func__,
                proposal_registry_result.error().message(),
                proposal.registry_cid(),
                proposal.proposal_id().substr( 0, 8 ),
                subject_hash.value().substr( 0, 8 ) );

            {
                std::lock_guard lock( proposals_mutex_ );
                if ( proposals_.find( proposal.proposal_id() ) == proposals_.end() )
                {
                    ProposalState state;
                    state.proposal = proposal;
                    state.slot_key = GetSlotKey( proposal );
                    proposals_.emplace( proposal.proposal_id(), std::move( state ) );
                }
            }

            AddPendingProposal( proposal, subject_hash.value() );
            return;
        }
        if ( proposal.registry_epoch() != proposal_registry_result.value().epoch() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: registry epoch mismatch proposal={} registry={}",
                                             __func__,
                                             proposal.registry_epoch(),
                                             proposal_registry_result.value().epoch() );
            return;
        }

        if ( !CheckSubject( proposal.subject() ) )
        {
            ConsensusManagerLogger()->error( "{}: rejected: subject check failed for hash {} proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             proposal.proposal_id().substr( 0, 8 ) );
            return;
        }

        if ( CheckCertificateForSubject( subject_hash.value() ) )
        {
            ConsensusManagerLogger()->debug( "{}: ignored: subject already certified hash={} proposal_id={}",
                                             __func__,
                                             subject_hash.value().substr( 0, 8 ),
                                             proposal.proposal_id().substr( 0, 8 ) );
            std::lock_guard lock( proposals_mutex_ );
            pending_votes_.erase( proposal.proposal_id() );
            pending_proposals_.erase( proposal.proposal_id() );
            return;
        }

        SubjectHandler subject_handler;
        {
            std::shared_lock lock( subject_handlers_mutex_ );
            auto             handler_it = subject_handlers_.find( proposal.subject().subject_type_hash().hash() );
            if ( handler_it == subject_handlers_.end() )
            {
                ConsensusManagerLogger()->error(
                    "{}: rejected: subject handler missing type_hash={}",
                    __func__,
                    base::hex_lower( gsl::span<const uint8_t>(
                        reinterpret_cast<const uint8_t *>( proposal.subject().subject_type_hash().hash().data() ),
                        proposal.subject().subject_type_hash().hash().size() ) ) );
                return;
            }
            subject_handler = handler_it->second;
        }

        auto subject_result = subject_handler( proposal.subject() );
        if ( subject_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: subject handler error for hash {} proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             proposal.proposal_id().substr( 0, 8 ) );
            return;
        }

        if ( subject_result.value() == Check::Reject )
        {
            ConsensusManagerLogger()->error( "{}: rejected: subject check failed for hash {} proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             proposal.proposal_id().substr( 0, 8 ) );
            return;
        }

        if ( subject_result.value() == Check::Pending )
        {
            {
                std::lock_guard lock( proposals_mutex_ );
                if ( proposals_.find( proposal.proposal_id() ) == proposals_.end() )
                {
                    ProposalState state;
                    state.proposal = proposal;
                    state.slot_key = GetSlotKey( proposal );
                    proposals_.emplace( proposal.proposal_id(), std::move( state ) );
                }
            }
            ConsensusManagerLogger()->debug( "{}: Adding pending proposal for hash {} proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( proposal.subject() ),
                                             proposal.proposal_id().substr( 0, 8 ) );
            AddPendingProposal( proposal, subject_hash.value() );
            return;
        }

        ContinueProposalAfterSubject( proposal );
    }

    outcome::result<void> ConsensusManager::ResumeProposalHandling( const std::string &subject_hash )
    {
        if ( subject_hash.empty() )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        ConsensusManagerLogger()->trace( "{}: Attempting to resume proposals for hash={}",
                                         __func__,
                                         subject_hash.substr( 0, 8 ) );

        auto to_process = TakePendingProposals( subject_hash );

        for ( const auto &proposal : to_process )
        {
            SubjectHandler subject_handler;
            {
                std::shared_lock lock( subject_handlers_mutex_ );
                auto             handler_it = subject_handlers_.find( proposal.subject().subject_type_hash().hash() );
                if ( handler_it == subject_handlers_.end() )
                {
                    ConsensusManagerLogger()->error(
                        "{}: rejected: subject handler missing type_hash={}",
                        __func__,
                        base::hex_lower( gsl::span<const uint8_t>(
                            reinterpret_cast<const uint8_t *>( proposal.subject().subject_type_hash().hash().data() ),
                            proposal.subject().subject_type_hash().hash().size() ) ) );
                    continue;
                }
                subject_handler = handler_it->second;
            }

            auto subject_result = subject_handler( proposal.subject() );
            if ( subject_result.has_error() )
            {
                ConsensusManagerLogger()->error( "{}: rejected: subject handler error for hash {} proposal_id={}",
                                                 __func__,
                                                 subject_hash.substr( 0, 8 ),
                                                 proposal.proposal_id().substr( 0, 8 ) );
                continue;
            }

            if ( subject_result.value() == Check::Reject )
            {
                ConsensusManagerLogger()->error( "{}: rejected: subject check failed for hash {} proposal_id={}",
                                                 __func__,
                                                 subject_hash.substr( 0, 8 ),
                                                 proposal.proposal_id().substr( 0, 8 ) );
                continue;
            }

            if ( subject_result.value() == Check::Pending )
            {
                auto subject_hash_result = GetSubjectHash( proposal.subject() );
                if ( subject_hash_result.has_error() )
                {
                    ConsensusManagerLogger()->error( "{}: rejected: subject hash missing proposal_id={}",
                                                     __func__,
                                                     proposal.proposal_id() );
                    continue;
                }
                ConsensusManagerLogger()->debug( "{}: Adding pending proposal for hash {} proposal_id={}",
                                                 __func__,
                                                 subject_hash.substr( 0, 8 ),
                                                 proposal.proposal_id().substr( 0, 8 ) );
                AddPendingProposal( proposal, subject_hash_result.value() );
                continue;
            }

            ContinueProposalAfterSubject( proposal );
        }
        return outcome::success();
    }

    void ConsensusManager::ProcessCertificates()
    {
        std::vector<ProposalState> to_process;
        {
            std::lock_guard lock( proposals_mutex_ );
            for ( auto &kv : proposals_ )
            {
                auto &state = kv.second;
                if ( !state.quorum_reached )
                {
                    ConsensusManagerLogger()->debug(
                        "{}: Found proposal without quorum reached for hash {} proposal_id={}",
                        __func__,
                        GetPrintableSubjectHash( state.proposal.subject() ),
                        state.proposal.proposal_id().substr( 0, 8 ) );

                    continue;
                }
                to_process.push_back( state );
            }
        }

        for ( auto &state : to_process )
        {
            auto subject_hash = GetSubjectHash( state.proposal.subject() );
            if ( subject_hash.has_value() && CheckCertificateForSubject( subject_hash.value() ) )
            {
                ConsensusManagerLogger()->debug( "{}: hash {} already certified, clearing proposal_id={}",
                                                 __func__,
                                                 subject_hash.value().substr( 0, 8 ),
                                                 state.proposal.proposal_id().substr( 0, 8 ) );
                FireProposalCleanupCallbacks( state.proposal );
                ClearProposalSlot( state.proposal );
                continue;
            }
            ConsensusManagerLogger()->debug( "{}: Processing proposal with quorum reached for hash {} proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( state.proposal.subject() ),
                                             state.proposal.proposal_id().substr( 0, 8 ) );
            const auto now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                    std::chrono::system_clock::now().time_since_epoch() )
                                    .count();
            if ( state.quorum_reached_ts_ms != 0 && certificate_delay_.count() > 0 )
            {
                const auto elapsed_ms = static_cast<int64_t>( now_ms ) -
                                        static_cast<int64_t>( state.quorum_reached_ts_ms );
                if ( elapsed_ms < static_cast<int64_t>( certificate_delay_.count() ) )
                {
                    continue;
                }
            }

            const auto round = GetCurrentRound( state.proposal.timestamp() );
            if ( state.last_attempt_round != NO_ROUND && round == state.last_attempt_round )
            {
                ConsensusManagerLogger()->debug(
                    "{}: proposal already attempted in round for hash {} proposal_id={} round={}",
                    __func__,
                    GetPrintableSubjectHash( state.proposal.subject() ),
                    state.proposal.proposal_id().substr( 0, 8 ),
                    round );
                continue;
            }
            auto proposal_registry_result = registry_->LoadRegistry( state.proposal.registry_cid() );
            if ( proposal_registry_result.has_error() )
            {
                ConsensusManagerLogger()->debug( "{}: skipping proposal due to registry load error={} proposal_id={}",
                                                 __func__,
                                                 proposal_registry_result.error().message(),
                                                 state.proposal.proposal_id().substr( 0, 8 ) );
                continue;
            }
            const auto &proposal_registry = proposal_registry_result.value();
            if ( state.proposal.registry_epoch() != proposal_registry.epoch() )
            {
                ConsensusManagerLogger()->debug( "{}: skipping proposal due to registry epoch mismatch proposal_id={}",
                                                 __func__,
                                                 state.proposal.proposal_id().substr( 0, 8 ) );
                continue;
            }

            if ( !IsCurrentAggregator( state.proposal, proposal_registry ) )
            {
                ConsensusManagerLogger()->debug( "{}: not aggregator for proposal for hash {} proposal_id={}",
                                                 __func__,
                                                 GetPrintableSubjectHash( state.proposal.subject() ),
                                                 state.proposal.proposal_id().substr( 0, 8 ) );
                continue;
            }

            {
                std::lock_guard lock( proposals_mutex_ );
                auto            it = proposals_.find( state.proposal.proposal_id() );
                if ( it != proposals_.end() )
                {
                    it->second.last_attempt_round = round;
                }
            }
            ConsensusManagerLogger()->debug( "{}: Attempting to create certificate for hash {} proposal_id={} round={}",
                                             __func__,
                                             GetPrintableSubjectHash( state.proposal.subject() ),
                                             state.proposal.proposal_id().substr( 0, 8 ),
                                             round );
            auto certificate_result = CreateCertificate( state.proposal, state.votes );
            if ( certificate_result.has_error() )
            {
                ConsensusManagerLogger()->error(
                    "{}: failed: certificate creation error for hash {} proposal_id {}: {}",
                    __func__,
                    GetPrintableSubjectHash( state.proposal.subject() ),
                    state.proposal.proposal_id().substr( 0, 8 ),
                    certificate_result.error().message() );
                continue;
            }

            (void)SubmitCertificate( certificate_result.value() );
            FireProposalCleanupCallbacks( state.proposal );
            ClearProposalSlot( state.proposal );
            ConsensusManagerLogger()->debug( "{}: certificate submitted for hash {} proposal_id={}",
                                             __func__,
                                             GetPrintableSubjectHash( state.proposal.subject() ),
                                             state.proposal.proposal_id().substr( 0, 8 ) );
        }
    }

    void ConsensusManager::UpdateCertificatesPending()
    {
        bool has_pending = false;
        {
            std::lock_guard lock( proposals_mutex_ );
            for ( const auto &kv : proposals_ )
            {
                if ( kv.second.quorum_reached )
                {
                    has_pending = true;
                    break;
                }
            }
        }
        certificates_pending_.store( has_pending );
        if ( !has_pending )
        {
            timer_cv_.notify_all();
        }
    }

    bool ConsensusManager::RegisterCertificateFilter()
    {
        const std::string pattern = std::string( CERT_KEY_PATTERN );

        auto       weak_self         = weak_from_this();
        const bool filter_registered = db_->RegisterElementFilter(
            pattern,
            [weak_self]( const crdt::pb::Element &element ) -> std::optional<std::vector<crdt::pb::Element>>
            {
                if ( auto strong = weak_self.lock() )
                {
                    return strong->FilterCertificate( element );
                }
                return std::nullopt;
            } );

        const bool callback_registered = db_->RegisterNewElementCallback(
            pattern,
            [weak_self]( crdt::CRDTCallbackManager::NewDataPair new_data, const std::string &cid )
            {
                if ( auto strong = weak_self.lock() )
                {
                    strong->CertificateReceived( std::move( new_data ), cid );
                }
            } );

        db_->AddListenTopic( consensus_datastore_topic_ );

        return filter_registered && callback_registered;
    }

    std::optional<std::vector<crdt::pb::Element>> ConsensusManager::FilterCertificate(
        const crdt::pb::Element &element )
    {
        ConsensusManagerLogger()->trace( "{}: entry key={}", __func__, element.key() );
        Certificate certificate;
        if ( !certificate.ParseFromString( element.value() ) )
        {
            ConsensusManagerLogger()->error( "{}: parse failed, rejecting: {}", __func__, element.key() );
            return std::vector<crdt::pb::Element>{};
        }

        if ( certificate.proposal_id().empty() )
        {
            ConsensusManagerLogger()->error( "{}: missing proposal_id, rejecting: {}", __func__, element.key() );
            return std::vector<crdt::pb::Element>{};
        }

        if ( ValidateCertificate( certificate ) == Check::Reject )
        {
            ConsensusManagerLogger()->error( "{}: validation failed, rejecting: {}", __func__, element.key() );
            return std::vector<crdt::pb::Element>{};
        }

        ConsensusManagerLogger()->debug( "{}: certificate accepted key={}", __func__, element.key() );
        return std::nullopt;
    }

    void ConsensusManager::CertificateReceived( crdt::CRDTCallbackManager::NewDataPair new_data,
                                                const std::string                     &cid )
    {
        auto [key, value] = new_data;
        (void)cid;
        Certificate certificate;
        if ( !certificate.ParseFromArray( value.data(), value.size() ) )
        {
            ConsensusManagerLogger()->error( "{}: invalid certificate payload key={}", __func__, key );
            return;
        }

        auto subject_hash = GetSubjectHash( certificate.proposal().subject() );
        if ( subject_hash.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed getting subject hash proposal_id={} error={}",
                                             __func__,
                                             certificate.proposal_id().substr( 0, 8 ),
                                             subject_hash.error().message() );
            return;
        }

        auto certificate_check = ValidateCertificate( certificate );

        if ( certificate_check == Check::Stalled )
        {
            ConsensusManagerLogger()->error(
                "{}: Validation of the certificate pending for key {}, certificate handler not called ",
                __func__,
                key );
            certificate_work_journal_->MarkStalled( key );
            return;
        }

        registry_->OnFinalizedCertificate( certificate );

        CertificateSubjectHandler handler;
        {
            std::shared_lock lock( certificate_handlers_mutex_ );
            auto it = certificate_subject_handlers_.find( certificate.proposal().subject().subject_type_hash().hash() );
            if ( it == certificate_subject_handlers_.end() )
            {
                (void)certificate_work_journal_->MarkDone( key );
                ConsensusManagerLogger()->warn( "{}: No subject handler for certificate with key {} ", __func__, key );
                return;
            }
            handler = it->second;
        }

        auto certificate_handler_result = handler( subject_hash.value(), certificate );

        if ( certificate_handler_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: certificate handler error proposal_id={} error={}",
                                             __func__,
                                             certificate.proposal_id().substr( 0, 8 ),
                                             certificate_handler_result.error().message() );
            return;
        }
        auto certificate_result = certificate_handler_result.value();

        if ( certificate_result == Check::Stalled )
        {
            ConsensusManagerLogger()->error( "{}: certificate rejected by handler proposal_id={}",
                                             __func__,
                                             certificate.proposal_id().substr( 0, 8 ) );
            ConsensusManagerLogger()->debug( "{}: Key {} is not Done yet", __func__, key );
            certificate_work_journal_->MarkStalled( key );
            return;
        }
        (void)certificate_work_journal_->MarkDone( key );
    }

    ConsensusManager::Check ConsensusManager::ValidateCertificate( const Certificate &certificate ) const
    {
        if ( certificate.proposal_id().empty() )
        {
            ConsensusManagerLogger()->error( "{}: Certificate proposal ID missing ", __func__ );
            return Check::Reject;
        }
        if ( !certificate.has_proposal() )
        {
            ConsensusManagerLogger()->error( "{}: Certificate missing proposal ", __func__ );
            return Check::Reject;
        }

        const auto &proposal = certificate.proposal();
        if ( proposal.proposal_id() != certificate.proposal_id() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: proposal_id mismatch cert={} proposal={}",
                                             __func__,
                                             certificate.proposal_id(),
                                             proposal.proposal_id() );
            return Check::Reject;
        }
        if ( proposal.registry_cid() != certificate.registry_cid() ||
             proposal.registry_epoch() != certificate.registry_epoch() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: registry mismatch proposal_id={}",
                                             __func__,
                                             certificate.proposal_id() );
            return Check::Reject;
        }
        auto registry_ret = registry_->LoadRegistry( certificate.registry_cid() );
        if ( registry_ret.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: registry load error={} for registry cid {} proposal_id={}",
                                             __func__,
                                             registry_ret.error().message(),
                                             certificate.registry_cid(),
                                             certificate.proposal_id() );
            return Check::Stalled;
        }
        auto &registry = registry_ret.value();
        if ( !ValidateSubject( proposal.subject() ) )
        {
            ConsensusManagerLogger()->error( "{}: rejected: invalid subject proposal_id={}",
                                             __func__,
                                             proposal.proposal_id() );
            return Check::Reject;
        }
        if ( !CheckProposal( proposal ) )
        {
            ConsensusManagerLogger()->error( "{}: rejected: invalid proposal proposal_id={}",
                                             __func__,
                                             proposal.proposal_id() );
            return Check::Reject;
        }

        const auto computed_id = CreateProposalId( proposal );
        if ( computed_id.empty() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: computed_id empty", __func__ );
            return Check::Reject;
        }
        if ( computed_id != certificate.proposal_id() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: computed_id mismatch cert={} computed={}",
                                             __func__,
                                             certificate.proposal_id(),
                                             computed_id );
            return Check::Reject;
        }

        std::vector<Vote> votes;
        votes.reserve( static_cast<size_t>( certificate.votes_size() ) );
        for ( const auto &vote : certificate.votes() )
        {
            votes.push_back( vote );
        }
        auto tally = TallyVotes( proposal, votes, registry, certificate.registry_cid() );
        if ( tally.has_error() || !tally.value().has_quorum )
        {
            return Check::Reject;
        }

        return Check::Approve;
    }

    void ConsensusManager::HandleVote( const Vote &vote )
    {
        ConsensusManagerLogger()->trace( "{}: called. Vote by {} on proposal_id={} ",
                                         __func__,
                                         vote.voter_id().substr( 0, 8 ),
                                         vote.proposal_id().substr( 0, 8 ) );
        if ( !CheckVote( vote ) )
        {
            ConsensusManagerLogger()->error( "{}: rejected: Invalid vote proposal_id={} voter_id={}",
                                             __func__,
                                             vote.proposal_id(),
                                             vote.voter_id() );
            return;
        }
        if ( !vote.approve() )
        {
            ConsensusManagerLogger()->debug( "{}: ignored: vote not approved voter_id={}",
                                             __func__,
                                             vote.voter_id().substr( 0, 8 ) );
            //TODO - maybe see reputation?
            return;
        }

        auto signing_bytes = VoteSigningBytes( vote );
        if ( signing_bytes.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: signing bytes error={}",
                                             __func__,
                                             signing_bytes.error().message() );
            return;
        }
        if ( !GeniusAccount::VerifySignature( vote.voter_id(), vote.signature(), signing_bytes.value() ) )
        {
            ConsensusManagerLogger()->error( "{}: rejected: signature verification failed voter_id={}",
                                             __func__,
                                             vote.voter_id().substr( 0, 8 ) );
            return;
        }

        bool has_quorum = false;
        {
            std::lock_guard lock( proposals_mutex_ );
            auto            it = proposals_.find( vote.proposal_id() );
            if ( it == proposals_.end() )
            {
                pending_votes_[vote.proposal_id()].push_back( vote );
                ConsensusManagerLogger()->debug( "{}: queued pending vote proposal_id={}",
                                                 __func__,
                                                 vote.proposal_id().substr( 0, 8 ) );
                return;
            }
            auto &proposal_state = it->second;
            auto  subject_hash   = GetSubjectHash( proposal_state.proposal.subject() );
            if ( subject_hash.has_value() && CheckCertificateForSubject( subject_hash.value() ) )
            {
                ConsensusManagerLogger()->debug( "{}: ignored: vote for already certified hash {} proposal_id={}",
                                                 __func__,
                                                 subject_hash.value().substr( 0, 8 ),
                                                 vote.proposal_id().substr( 0, 8 ) );
                pending_votes_.erase( vote.proposal_id() );
                return;
            }
            auto slot_it = slot_states_.find( proposal_state.slot_key );
            if ( slot_it != slot_states_.end() && slot_it->second.best_proposal_id != vote.proposal_id() )
            {
                ConsensusManagerLogger()->error( "{}: ignored: not best proposal proposal_id={}",
                                                 __func__,
                                                 vote.proposal_id().substr( 0, 8 ) );
                return;
            }

            if ( proposal_state.seen_voters.find( vote.voter_id() ) != proposal_state.seen_voters.end() )
            {
                ConsensusManagerLogger()->trace( "{}: ignored: duplicate vote voter_id={}",
                                                 __func__,
                                                 vote.voter_id().substr( 0, 8 ) );
                return;
            }

            auto proposal_registry_result = registry_->LoadRegistry( proposal_state.proposal.registry_cid() );
            if ( proposal_registry_result.has_error() )
            {
                ConsensusManagerLogger()->warn( "{}: deferred vote: registry load error={} proposal_id={}",
                                                __func__,
                                                proposal_registry_result.error().message(),
                                                vote.proposal_id().substr( 0, 8 ) );
                pending_votes_[vote.proposal_id()].push_back( vote );
                return;
            }
            const auto &proposal_registry = proposal_registry_result.value();
            if ( proposal_state.proposal.registry_epoch() != proposal_registry.epoch() )
            {
                ConsensusManagerLogger()->error( "{}: rejected: registry mismatch proposal_id={}",
                                                 __func__,
                                                 vote.proposal_id().substr( 0, 8 ) );
                return;
            }

            const auto *validator           = registry_->FindValidator( proposal_registry, vote.voter_id() );
            const bool  is_active_validator = validator && validator->status() == ValidatorRegistry::Status::ACTIVE;

            if ( it->second.total_weight == 0 )
            {
                it->second.total_weight = registry_->TotalWeight( proposal_registry );
            }

            it->second.votes.push_back( vote );
            it->second.seen_voters.insert( vote.voter_id() );
            if ( is_active_validator )
            {
                it->second.approved_weight += validator->weight();
                has_quorum = registry_->IsQuorum( it->second.approved_weight, it->second.total_weight );
                if ( has_quorum )
                {
                    if ( !it->second.quorum_reached )
                    {
                        it->second.quorum_reached       = true;
                        it->second.quorum_reached_ts_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
                                                              std::chrono::system_clock::now().time_since_epoch() )
                                                              .count();
                    }
                    ConsensusManagerLogger()->debug(
                        "{}: quorum reached; certificate will be created by timer proposal_id={}",
                        __func__,
                        vote.proposal_id() );
                }
            }
            else
            {
                ConsensusManagerLogger()->debug( "{}: accepted vote from non-validator voter_id={}",
                                                 __func__,
                                                 vote.voter_id().substr( 0, 8 ) );
            }
        }
        if ( has_quorum )
        {
            certificates_pending_.store( true );
            timer_cv_.notify_all();
        }
    }

    void ConsensusManager::HandleVoteBundle( const VoteBundle &bundle )
    {
        ConsensusManagerLogger()->trace( "{}: called proposal_id={} votes={}",
                                         __func__,
                                         bundle.proposal_id().substr( 0, 8 ),
                                         bundle.votes_size() );

        for ( const auto &vote : bundle.votes() )
        {
            ConsensusManagerLogger()->trace( "{}: processing voter_id={}", __func__, vote.voter_id().substr( 0, 8 ) );
            HandleVote( vote );
        }
    }

    void ConsensusManager::HandleCertificate( const Certificate &certificate )
    {
        ConsensusManagerLogger()->trace( "{}: called proposal_id={}", __func__, certificate.proposal_id() );

        if ( ValidateCertificate( certificate ) == Check::Reject )
        {
            ConsensusManagerLogger()->error( "{}: rejected: invalid certificate proposal_id={}",
                                             __func__,
                                             certificate.proposal_id() );
            return;
        }

        ProposalState proposal_state;
        auto          fetch_proposal_state_ret = FetchProposalState( certificate );
        if ( fetch_proposal_state_ret.has_value() )
        {
            proposal_state = fetch_proposal_state_ret.value();
            ConsensusManagerLogger()->debug( "{}: fetched proposal state, proposal_id={}",
                                             __func__,
                                             certificate.proposal_id() );
        }
        else
        {
            ConsensusManagerLogger()->debug( "{}: proposal state not found, creating new one proposal_id={}",
                                             __func__,
                                             certificate.proposal_id() );
            proposal_state = CreateProposalState( certificate );
        }

        if ( !ValidateCertificateBestProposal( proposal_state, certificate ) )
        {
            return;
        }

        ClearProposalSlot( certificate.proposal() );
        ConsensusManagerLogger()->debug( "{}: success proposal_id={}", __func__, certificate.proposal_id() );
    }

    outcome::result<ConsensusManager::ProposalState> ConsensusManager::FetchProposalState(
        const Certificate &certificate )
    {
        std::lock_guard lock( proposals_mutex_ );
        auto            it = proposals_.find( certificate.proposal_id() );
        if ( it == proposals_.end() )
        {
            return outcome::failure( std::errc::no_such_device );
        }
        return it->second;
    }

    ConsensusManager::ProposalState ConsensusManager::CreateProposalState( const Certificate &certificate )
    {
        ProposalState new_state;
        new_state.proposal = certificate.proposal();
        new_state.slot_key = GetSlotKey( new_state.proposal );
        proposals_.emplace( new_state.proposal.proposal_id(), new_state );

        auto &slot_state = slot_states_[new_state.slot_key];
        if ( slot_state.best_proposal_id.empty() )
        {
            slot_state.best_proposal_id = new_state.proposal.proposal_id();
            auto nonce_payload          = DecodeNonceSubject( new_state.proposal.subject() );
            if ( nonce_payload.has_value() )
            {
                slot_state.best_tx_hash = nonce_payload.value().tx_hash();
            }
        }

        return new_state;
    }

    bool ConsensusManager::ValidateCertificateBestProposal( const ProposalState &state,
                                                            const Certificate   &certificate ) const
    {
        if ( certificate.has_proposal() && certificate.proposal().has_subject() &&
             DecodeRegistryBatchSubject( certificate.proposal().subject() ).has_value() )
        {
            // Registry-batch subjects can have multiple competing proposals for the same deterministic batch root.
            // Once a valid certificate exists, accept it even if local best_proposal_id changed due proposal races.
            return true;
        }
        std::lock_guard lock( proposals_mutex_ );
        auto            slot_it = slot_states_.find( state.slot_key );
        if ( slot_it != slot_states_.end() && slot_it->second.best_proposal_id != certificate.proposal_id() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: not best proposal proposal_id={}",
                                             __func__,
                                             certificate.proposal_id() );
            return false;
        }
        return true;
    }

    std::vector<ConsensusManager::Vote> ConsensusManager::CollectCertificateVotes(
        const Certificate &certificate ) const
    {
        std::vector<Vote> votes;
        votes.reserve( static_cast<size_t>( certificate.votes_size() ) );
        for ( const auto &vote : certificate.votes() )
        {
            ConsensusManagerLogger()->trace( "{}: processing vote voter_id={}", __func__, vote.voter_id() );
            votes.push_back( vote );
        }
        return votes;
    }

    void ConsensusManager::ClearProposalSlot( const Proposal &proposal )
    {
        std::lock_guard lock( proposals_mutex_ );

        std::string slot_key;
        auto        it = proposals_.find( proposal.proposal_id() );
        if ( it != proposals_.end() )
        {
            slot_key = it->second.slot_key;
        }
        else
        {
            slot_key = GetSlotKey( proposal );
        }

        std::unordered_set<std::string> ids_to_remove;
        ids_to_remove.insert( proposal.proposal_id() );
        for ( const auto &kv : proposals_ )
        {
            if ( kv.second.slot_key == slot_key )
            {
                ids_to_remove.insert( kv.first );
            }
        }

        for ( const auto &proposal_id : ids_to_remove )
        {
            proposals_.erase( proposal_id );
            pending_proposals_.erase( proposal_id );
            pending_votes_.erase( proposal_id );
        }

        for ( auto it_hash = pending_by_subject_hash_.begin(); it_hash != pending_by_subject_hash_.end(); )
        {
            auto &vec = it_hash->second;
            vec.erase( std::remove_if( vec.begin(),
                                       vec.end(),
                                       [&]( const std::string &proposal_id )
                                       { return ids_to_remove.find( proposal_id ) != ids_to_remove.end(); } ),
                       vec.end() );
            if ( vec.empty() )
            {
                it_hash = pending_by_subject_hash_.erase( it_hash );
            }
            else
            {
                ++it_hash;
            }
        }

        slot_states_.erase( slot_key );

        bool has_pending = false;
        for ( const auto &kv : proposals_ )
        {
            if ( kv.second.quorum_reached )
            {
                has_pending = true;
                break;
            }
        }
        certificates_pending_.store( has_pending );
        if ( !has_pending )
        {
            timer_cv_.notify_all();
        }
    }

    std::string ConsensusManager::GetSlotKey( const Proposal &proposal )
    {
        ConsensusManagerLogger()->trace( "{}: called proposal_id={}", __func__, proposal.proposal_id() );

        if ( !proposal.subject().has_subject_type_hash() )
        {
            return proposal.proposal_id();
        }
        const auto &subject = proposal.subject();
        const auto &hash    = subject.subject_type_hash().hash();

        {
            std::shared_lock lock( slot_key_handlers_mutex_ );
            auto             it = slot_key_handlers_.find( hash );
            if ( it != slot_key_handlers_.end() )
            {
                return it->second( subject );
            }
        }

        auto subject_id = ComputeSubjectId( subject );
        return subject_id.has_value() ? subject_id.value() : proposal.proposal_id();
    }

    bool ConsensusManager::IsBetterProposal( const Proposal &candidate, const Proposal &current ) const
    {
        ConsensusManagerLogger()->trace( "{}: called candidate={} current={}",
                                         __func__,
                                         candidate.proposal_id(),
                                         current.proposal_id() );
        auto candidate_nonce = DecodeNonceSubject( candidate.subject() );
        auto current_nonce   = DecodeNonceSubject( current.subject() );
        if ( candidate_nonce.has_value() && current_nonce.has_value() )
        {
            const auto &cand_hash = candidate_nonce.value().tx_hash();
            const auto &curr_hash = current_nonce.value().tx_hash();
            if ( cand_hash == curr_hash )
            {
                return candidate.proposal_id() < current.proposal_id();
            }
            return BestHash( curr_hash, cand_hash ) == cand_hash;
        }

        return candidate.proposal_id() < current.proposal_id();
    }

    const std::string &ConsensusManager::BestHash( const std::string &a, const std::string &b )
    {
        return ( a <= b ) ? a : b;
    }

    outcome::result<std::string> ConsensusManager::ComputeSubjectId( const Subject &subject )
    {
        ConsensusManagerLogger()->trace( "{}: called", __func__ );
        std::string serialized;
        if ( !subject.SerializeToString( &serialized ) )
        {
            ConsensusManagerLogger()->error( "{}: failed: serialization error", __func__ );
            return outcome::failure( std::errc::invalid_argument );
        }

        sgns::crypto::HasherImpl hasher;
        auto                     hash = hasher.sha2_256( serialized.data(), serialized.size() );
        ConsensusManagerLogger()->debug( "{}: success", __func__ );
        return base::hex_lower( gsl::span<const uint8_t>( hash.data(), hash.size() ) );
    }

    namespace
    {
        constexpr size_t kSubjectTypeHashSize = base::Hash256::size();

        outcome::result<std::string> ComputePayloadHash( const std::string &payload )
        {
            if ( payload.empty() )
            {
                return outcome::failure( std::errc::invalid_argument );
            }
            sgns::crypto::HasherImpl hasher;
            auto                     hash = hasher.sha2_256( payload.data(), payload.size() );
            return std::string( reinterpret_cast<const char *>( hash.data() ), hash.size() );
        }

        bool SetSubjectPayload( ConsensusSubject                    *subject,
                                const std::string                   &subject_type_hash,
                                const google::protobuf::MessageLite &payload )
        {
            if ( subject == nullptr || subject_type_hash.size() != kSubjectTypeHashSize )
            {
                return false;
            }
            std::string serialized;
            if ( !payload.SerializeToString( &serialized ) )
            {
                return false;
            }
            std::string canonical_payload = subject_type_hash + serialized;
            auto        payload_hash      = ComputePayloadHash( canonical_payload );
            if ( payload_hash.has_error() )
            {
                return false;
            }
            subject->set_payload( canonical_payload.data(), canonical_payload.size() );
            subject->set_payload_hash( payload_hash.value().data(), payload_hash.value().size() );
            return true;
        }

        outcome::result<std::string> ExtractBuiltinPayload( const ConsensusSubject &subject,
                                                            std::string_view        subject_type )
        {
            auto expected = ConsensusManager::ComputeSubjectTypeHash( subject_type );
            if ( expected.has_error() || !subject.has_subject_type_hash() ||
                 subject.subject_type_hash().hash() != expected.value() ||
                 subject.payload().size() <= kSubjectTypeHashSize || expected.value().size() != kSubjectTypeHashSize ||
                 subject.payload().compare( 0, kSubjectTypeHashSize, expected.value() ) != 0 )
            {
                return outcome::failure( std::errc::invalid_argument );
            }
            return subject.payload().substr( kSubjectTypeHashSize );
        }
    }

    outcome::result<std::string> ConsensusManager::ComputeSubjectTypeHash( std::string_view subject_type )
    {
        if ( subject_type.empty() )
        {
            return outcome::failure( std::errc::invalid_argument );
        }

        sgns::crypto::HasherImpl hasher;
        auto                     hash = hasher.sha2_256( subject_type.data(), subject_type.size() );
        return std::string( reinterpret_cast<const char *>( hash.data() ), hash.size() );
    }

    bool ConsensusManager::SubjectTypeMatches( const Subject &subject, std::string_view subject_type )
    {
        auto expected = ComputeSubjectTypeHash( subject_type );
        return expected.has_value() && subject.has_subject_type_hash() &&
               subject.subject_type_hash().hash() == expected.value();
    }

    outcome::result<NonceSubject> ConsensusManager::DecodeNonceSubject( const Subject &subject )
    {
        auto raw_payload = ExtractBuiltinPayload( subject, NONCE_SUBJECT_TYPE );
        if ( raw_payload.has_error() )
        {
            return outcome::failure( raw_payload.error() );
        }
        NonceSubject payload;
        if ( !payload.ParseFromString( raw_payload.value() ) )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        return payload;
    }

    outcome::result<TaskResultSubject> ConsensusManager::DecodeTaskResultSubject( const Subject &subject )
    {
        auto raw_payload = ExtractBuiltinPayload( subject, TASK_RESULT_SUBJECT_TYPE );
        if ( raw_payload.has_error() )
        {
            return outcome::failure( raw_payload.error() );
        }
        TaskResultSubject payload;
        if ( !payload.ParseFromString( raw_payload.value() ) )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        return payload;
    }

    outcome::result<RegistryBatchSubject> ConsensusManager::DecodeRegistryBatchSubject( const Subject &subject )
    {
        auto raw_payload = ExtractBuiltinPayload( subject, REGISTRY_BATCH_SUBJECT_TYPE );
        if ( raw_payload.has_error() )
        {
            return outcome::failure( raw_payload.error() );
        }
        RegistryBatchSubject payload;
        if ( !payload.ParseFromString( raw_payload.value() ) )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        return payload;
    }

    bool ConsensusManager::SubjectHasValidTypeHash( Subject *subject )
    {
        return subject != nullptr && subject->has_subject_type_hash() && !subject->subject_type_hash().hash().empty();
    }

    outcome::result<ConsensusManager::Subject> ConsensusManager::CreateNonceSubject(
        const std::string                             &account_id,
        uint64_t                                       nonce,
        const std::string                             &tx_hash,
        const EmbeddedTransaction                     &transaction,
        const std::optional<UTXOTransitionCommitment> &utxo_commitment,
        const std::optional<UTXOWitness>              &utxo_witness )
    {
        ConsensusManagerLogger()->trace( "{}: called account_id={} nonce={}", __func__, account_id, nonce );
        Subject subject;
        subject.set_account_id( account_id );
        NonceSubject payload;
        payload.set_nonce( nonce );
        payload.set_tx_hash( tx_hash.data(), tx_hash.size() );
        *payload.mutable_transaction() = std::move( transaction );
        if ( utxo_commitment.has_value() )
        {
            *payload.mutable_utxo_commitment() = utxo_commitment.value();
        }
        if ( utxo_witness.has_value() )
        {
            *payload.mutable_utxo_witness() = utxo_witness.value();
        }
        auto type_hash = ComputeSubjectTypeHash( NONCE_SUBJECT_TYPE );
        if ( type_hash.has_error() || !SetSubjectPayload( &subject, type_hash.value(), payload ) )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        subject.mutable_subject_type_hash()->set_hash( type_hash.value().data(), type_hash.value().size() );

        ConsensusManagerLogger()->debug( "{}: success", __func__ );
        return subject;
    }

    outcome::result<ConsensusManager::Subject> ConsensusManager::CreateTaskResultSubject(
        const std::string &account_id,
        const std::string &escrow_path,
        const std::string &task_result_hash,
        uint64_t           result_epoch )
    {
        ConsensusManagerLogger()->trace( "{}: called account_id={} result_epoch={}",
                                         __func__,
                                         account_id,
                                         result_epoch );
        Subject subject;
        subject.set_account_id( account_id );
        TaskResultSubject payload;
        payload.set_escrow_path( escrow_path );
        payload.set_task_result_hash( task_result_hash.data(), task_result_hash.size() );
        payload.set_result_epoch( result_epoch );
        auto type_hash = ComputeSubjectTypeHash( TASK_RESULT_SUBJECT_TYPE );
        if ( type_hash.has_error() || !SetSubjectPayload( &subject, type_hash.value(), payload ) )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        subject.mutable_subject_type_hash()->set_hash( type_hash.value().data(), type_hash.value().size() );

        ConsensusManagerLogger()->debug( "{}: success", __func__ );
        return subject;
    }

    outcome::result<ConsensusManager::Subject> ConsensusManager::CreateRegistryBatchSubject(
        const std::string &account_id,
        const std::string &base_registry_cid,
        uint64_t           base_registry_epoch,
        uint64_t           target_registry_epoch,
        uint32_t           certificate_count,
        const std::string &batch_root )
    {
        ConsensusManagerLogger()->trace( "{}: called account_id={} base_epoch={} target_epoch={} certificates={}",
                                         __func__,
                                         account_id.substr( 0, 8 ),
                                         base_registry_epoch,
                                         target_registry_epoch,
                                         certificate_count );
        Subject subject;
        subject.set_account_id( account_id );
        RegistryBatchSubject payload;
        payload.set_base_registry_cid( base_registry_cid );
        payload.set_base_registry_epoch( base_registry_epoch );
        payload.set_target_registry_epoch( target_registry_epoch );
        payload.set_certificate_count( certificate_count );
        payload.set_batch_root( batch_root.data(), batch_root.size() );
        auto type_hash = ComputeSubjectTypeHash( REGISTRY_BATCH_SUBJECT_TYPE );
        if ( type_hash.has_error() || !SetSubjectPayload( &subject, type_hash.value(), payload ) )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        subject.mutable_subject_type_hash()->set_hash( type_hash.value().data(), type_hash.value().size() );

        ConsensusManagerLogger()->debug( "{}: success", __func__ );
        return subject;
    }

    outcome::result<ConsensusManager::Subject> ConsensusManager::CreateGenericSubject(
        const std::string          &account_id,
        std::string_view            subject_type,
        const std::vector<uint8_t> &payload )
    {
        ConsensusManagerLogger()->trace( "{}: called account_id={} subject_type={}",
                                         __func__,
                                         account_id.substr( 0, 8 ),
                                         subject_type );
        if ( account_id.empty() || subject_type.empty() || payload.empty() )
        {
            return outcome::failure( std::errc::invalid_argument );
        }

        Subject subject;
        subject.set_account_id( account_id );
        subject.set_payload( payload.data(), payload.size() );
        auto payload_hash = ComputePayloadHash( subject.payload() );
        auto type_hash    = ComputeSubjectTypeHash( subject_type );
        if ( payload_hash.has_error() || type_hash.has_error() )
        {
            return outcome::failure( std::errc::invalid_argument );
        }
        subject.set_payload_hash( payload_hash.value().data(), payload_hash.value().size() );
        subject.mutable_subject_type_hash()->set_hash( type_hash.value().data(), type_hash.value().size() );
        ConsensusManagerLogger()->debug( "{}: success", __func__ );
        return subject;
    }

    std::string ConsensusManager::CreateProposalId( const Proposal &proposal )
    {
        ConsensusManagerLogger()->trace( "{}: Creating proposal ID", __func__ );
        // Proposal ID must be derived from the proposal contents excluding the proposal_id itself.
        Proposal copy = proposal;
        copy.clear_proposal_id();
        auto signing_bytes = ProposalSigningBytes( copy );
        if ( signing_bytes.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed, no proposal ID created: signing bytes error={}",
                                             __func__,
                                             signing_bytes.error().message() );
            return {};
        }

        sgns::crypto::HasherImpl hasher;
        auto                     hash = hasher.sha2_256( signing_bytes.value().data(), signing_bytes.value().size() );
        auto                     proposal_id = base::hex_lower( gsl::span<const uint8_t>( hash.data(), hash.size() ) );
        ConsensusManagerLogger()->debug( "{}: Proposal ID {} created", __func__, proposal_id.substr( 0, 8 ) );
        return proposal_id;
    }

    bool ConsensusManager::ValidateSubject( const Subject &subject )
    {
        ConsensusManagerLogger()->trace( "{}: called", __func__ );
        if ( subject.account_id().empty() )
        {
            return false;
        }
        if ( !subject.has_subject_type_hash() || subject.subject_type_hash().hash().empty() )
        {
            return false;
        }
        if ( subject.payload().empty() || subject.payload_hash().empty() )
        {
            return false;
        }
        auto payload_hash = ComputePayloadHash( subject.payload() );
        if ( payload_hash.has_error() || payload_hash.value() != subject.payload_hash() )
        {
            return false;
        }

        if ( SubjectTypeMatches( subject, NONCE_SUBJECT_TYPE ) )
        {
            auto payload = DecodeNonceSubject( subject );
            if ( payload.has_error() || payload.value().tx_hash().empty() )
            {
                return false;
            }
            if ( payload.value().has_utxo_witness() && !payload.value().has_utxo_commitment() )
            {
                return false;
            }
            return true;
        }
        if ( SubjectTypeMatches( subject, TASK_RESULT_SUBJECT_TYPE ) )
        {
            auto payload = DecodeTaskResultSubject( subject );
            return payload.has_value() && !payload.value().task_result_hash().empty();
        }
        if ( SubjectTypeMatches( subject, REGISTRY_BATCH_SUBJECT_TYPE ) )
        {
            auto payload = DecodeRegistryBatchSubject( subject );
            if ( payload.has_error() )
            {
                return false;
            }
            return !payload.value().base_registry_cid().empty() &&
                   payload.value().target_registry_epoch() == payload.value().base_registry_epoch() + 1 &&
                   payload.value().certificate_count() > 0 && !payload.value().batch_root().empty();
        }
        return true;
    }

    void ConsensusManager::OnConsensusMessage( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
    {
        ConsensusManagerLogger()->trace( "{}: called", __func__ );
        if ( !message )
        {
            ConsensusManagerLogger()->error( "{}: ignored: message is empty", __func__ );
            return;
        }

        ConsensusMessage decoded;
        if ( !decoded.ParseFromArray( message->data.data(), static_cast<int>( message->data.size() ) ) )
        {
            ConsensusManagerLogger()->error( "{}: Failed to decode consensus message", __func__ );
            return;
        }

        if ( decoded.has_proposal() )
        {
            ConsensusManagerLogger()->debug( "{}: decoded proposal", __func__ );
            HandleProposal( decoded.proposal() );
            return;
        }
        if ( decoded.has_vote() )
        {
            ConsensusManagerLogger()->debug( "{}: decoded vote", __func__ );
            HandleVote( decoded.vote() );
            return;
        }
        if ( decoded.has_vote_bundle() )
        {
            ConsensusManagerLogger()->debug( "{}: decoded vote bundle", __func__ );
            HandleVoteBundle( decoded.vote_bundle() );
            return;
        }
        if ( decoded.has_certificate() )
        {
            ConsensusManagerLogger()->debug( "{}: decoded certificate", __func__ );
            HandleCertificate( decoded.certificate() );
        }
    }

    bool ConsensusManager::CheckSubject( const Subject &subject )
    {
        ConsensusManagerLogger()->trace( "{}: called", __func__ );

        if ( subject.account_id().empty() )
        {
            ConsensusManagerLogger()->error( "{}: subject account_id is empty", __func__ );
            return false;
        }

        if ( !subject.has_subject_type_hash() || subject.subject_type_hash().hash().empty() )
        {
            ConsensusManagerLogger()->error( "{}: subject subject_type_hash is empty", __func__ );
            return false;
        }
        if ( subject.payload().empty() )
        {
            ConsensusManagerLogger()->error( "{}: subject payload is empty", __func__ );
            return false;
        }
        if ( subject.payload_hash().empty() )
        {
            ConsensusManagerLogger()->error( "{}: subject payload_hash is empty", __func__ );
            return false;
        }
        auto payload_hash = ComputePayloadHash( subject.payload() );
        if ( payload_hash.has_error() || payload_hash.value() != subject.payload_hash() )
        {
            ConsensusManagerLogger()->error( "{}: subject payload_hash mismatch", __func__ );
            return false;
        }

        if ( SubjectTypeMatches( subject, NONCE_SUBJECT_TYPE ) )
        {
            auto payload = DecodeNonceSubject( subject );
            if ( payload.has_error() || payload.value().tx_hash().empty() )
            {
                ConsensusManagerLogger()->error( "{}: subject nonce tx_hash is empty", __func__ );
                return false;
            }
            return true;
        }

        if ( SubjectTypeMatches( subject, TASK_RESULT_SUBJECT_TYPE ) )
        {
            auto payload = DecodeTaskResultSubject( subject );
            if ( payload.has_error() || payload.value().escrow_path().empty() )
            {
                ConsensusManagerLogger()->error( "{}: subject task_result escrow_path is empty", __func__ );
                return false;
            }
            if ( payload.value().task_result_hash().empty() )
            {
                ConsensusManagerLogger()->error( "{}: subject task_result task_result_hash is empty", __func__ );
                return false;
            }
            return true;
        }

        if ( SubjectTypeMatches( subject, REGISTRY_BATCH_SUBJECT_TYPE ) )
        {
            auto payload = DecodeRegistryBatchSubject( subject );
            if ( payload.has_error() )
            {
                return false;
            }
            if ( payload.value().base_registry_cid().empty() )
            {
                ConsensusManagerLogger()->error( "{}: subject registry_batch base_registry_cid is empty", __func__ );
                return false;
            }
            if ( payload.value().target_registry_epoch() != payload.value().base_registry_epoch() + 1 )
            {
                ConsensusManagerLogger()->error( "{}: subject registry_batch target epoch mismatch", __func__ );
                return false;
            }
            if ( payload.value().certificate_count() == 0 )
            {
                ConsensusManagerLogger()->error( "{}: subject registry_batch certificate_count is zero", __func__ );
                return false;
            }
            if ( payload.value().batch_root().empty() )
            {
                ConsensusManagerLogger()->error( "{}: subject registry_batch batch_root is empty", __func__ );
                return false;
            }
        }

        return true;
    }

    bool ConsensusManager::CheckProposal( const Proposal &proposal )
    {
        if ( proposal.proposal_id().empty() )
        {
            ConsensusManagerLogger()->error( "{}: Proposal ID missing ", __func__ );
            return false;
        }
        if ( proposal.proposer_id().empty() )
        {
            ConsensusManagerLogger()->error( "{}: Proposer ID missing ", __func__ );
            return false;
        }
        if ( proposal.registry_cid().empty() )
        {
            ConsensusManagerLogger()->error( "{}: Registry CID missing ", __func__ );
            return false;
        }
        if ( !proposal.has_subject() )
        {
            ConsensusManagerLogger()->error( "{}: Proposal without subject ", __func__ );
            return false;
        }
        auto signing_bytes = ProposalSigningBytes( proposal );
        if ( signing_bytes.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: rejected: signing bytes error={}",
                                             __func__,
                                             signing_bytes.error().message() );
            return false;
        }
        if ( !GeniusAccount::VerifySignature( proposal.proposer_id(), proposal.signature(), signing_bytes.value() ) )
        {
            ConsensusManagerLogger()->error( "{}: rejected: signature verification failed proposer_id={}",
                                             __func__,
                                             proposal.proposer_id() );
            return false;
        }
        return true;
    }

    bool ConsensusManager::CheckVote( const Vote &vote )
    {
        if ( vote.proposal_id().empty() )
        {
            ConsensusManagerLogger()->error( "{}: Vote proposal ID missing ", __func__ );
            return false;
        }
        if ( vote.voter_id().empty() )
        {
            ConsensusManagerLogger()->error( "{}: Vote voter ID missing ", __func__ );
            return false;
        }
        return true;
    }

    void ConsensusManager::RecoverPendingCertificateWork()
    {
        auto recovered = certificate_work_journal_->RecoverStaleProcessing( CERT_KEY_PATTERN,
                                                                            std::chrono::seconds( 15 ) );
        if ( recovered > 0 )
        {
            ConsensusManagerLogger()->info( "{}: recovered {} stale certificate work items", __func__, recovered );
        }

        auto       unfinished = certificate_work_journal_->ListUnfinished( CERT_KEY_PATTERN );
        const auto now_ms     = static_cast<uint64_t>(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );

        for ( const auto &entry : unfinished )
        {
            if ( entry.key.empty() )
            {
                continue;
            }
            if ( entry.state != crdt::CRDTWorkJournal::State::Stalled )
            {
                continue;
            }
            if ( entry.lease_until_ms != 0 && entry.lease_until_ms > now_ms )
            {
                continue;
            }
            auto value = db_->Get( { entry.key } );
            if ( value.has_error() )
            {
                continue;
            }
            CertificateReceived( { entry.key, value.value() }, std::string{} );
        }
    }

    outcome::result<ConsensusManager::Certificate> ConsensusManager::GetCertificateBySubjectHash(
        const std::string &subject_hash ) const
    {
        const auto key = std::string{ CERTIFICATE_BASE_PATH_KEY } + subject_hash;

        BOOST_OUTCOME_TRY( auto certificate_data, db_->Get( { key } ) );

        Certificate certificate;
        if ( !certificate.ParseFromArray( certificate_data.data(), certificate_data.size() ) )
        {
            ConsensusManagerLogger()->error( "{}: invalid certificate payload key={}", __func__, key );
            return outcome::failure( std::errc::invalid_argument );
        }

        auto current_hash = GetSubjectHash( certificate.proposal().subject() );
        if ( current_hash.has_error() )
        {
            return outcome::failure( current_hash.error() );
        }
        if ( current_hash.value() != subject_hash )
        {
            ConsensusManagerLogger()->error( "{}: certificate subject hash mismatch expected={} actual={}",
                                             __func__,
                                             subject_hash,
                                             current_hash.value() );
            return outcome::failure( std::errc::invalid_argument );
        }
        return certificate;
    }

    bool ConsensusManager::CheckCertificateForSubject( const std::string &subject_hash ) const
    {
        auto certificate_result = GetCertificateBySubjectHash( subject_hash );
        if ( certificate_result.has_error() )
        {
            return false;
        }
        auto certificate_check = ValidateCertificate( certificate_result.value() );
        return certificate_check == Check::Approve;
    }

    bool ConsensusManager::CheckCertificateForSubject( const ConsensusManager::Subject &subject ) const
    {
        auto current_hash = GetSubjectHash( subject );
        if ( current_hash.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: Failed to get the hash for the subject, error: {}",
                                             __func__,
                                             current_hash.error().message() );
            return false;
        }
        auto certificate_result = GetCertificateBySubjectHash( current_hash.value() );
        if ( certificate_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: Failed to get the certificate for the hash {}, error: {}",
                                             __func__,
                                             GetPrintableSubjectHash( subject ),
                                             certificate_result.error().message() );
            return false;
        }
        auto &certificate                   = certificate_result.value();
        auto  certificate_subject_id_result = ComputeSubjectId( certificate.proposal().subject() );
        if ( certificate_subject_id_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed for hash {}: certificate subject id computation error={}",
                                             __func__,
                                             GetPrintableSubjectHash( subject ),
                                             certificate_subject_id_result.error().message() );
            return false;
        }
        auto &certificate_subject_id = certificate_subject_id_result.value();
        auto  subject_id_result      = ComputeSubjectId( subject );
        if ( subject_id_result.has_error() )
        {
            ConsensusManagerLogger()->error( "{}: failed for hash {}: subject id computation error={}",
                                             __func__,
                                             GetPrintableSubjectHash( subject ),
                                             subject_id_result.error().message() );
            return false;
        }
        auto proposed_subject_id = subject_id_result.value();
        bool equal               = proposed_subject_id == certificate_subject_id;
        if ( !equal )
        {
            ConsensusManagerLogger()->debug( "{}: Match for subject and certificate (hash {}): MISMATCH",
                                             __func__,
                                             GetPrintableSubjectHash( subject ) );
            return false;
        }
        auto certificate_check = ValidateCertificate( certificate );
        if ( certificate_check != Check::Approve )
        {
            ConsensusManagerLogger()->error( "{}: certificate failed validation for hash {}",
                                             __func__,
                                             GetPrintableSubjectHash( subject ) );
            return false;
        }
        ConsensusManagerLogger()->debug( "{}: Match for subject and certificate (hash {}): {}",
                                         __func__,
                                         GetPrintableSubjectHash( subject ),
                                         equal ? "Match" : "MISMATCH" );
        return true;
    }

    std::string ConsensusManager::GetPrintableSubjectHash( const Subject &subject )
    {
        auto              subject_hash = GetSubjectHash( subject );
        const std::string short_hash   = subject_hash.has_value() ? subject_hash.value().substr( 0, 8 ) : "Invalid";
        return short_hash;
    }

}

Updated on 2026-06-05 at 17:22:19 -0700