Skip to content

src/account/AccountMessenger.cpp

More...

Namespaces

Name
sgns

Functions

Name
OUTCOME_CPP_DEFINE_CATEGORY_3(sgns , AccountMessenger::Error , e )

Detailed Description

Date: 2025-07-22 Henrique A. Klein ([email protected])

Functions Documentation

function OUTCOME_CPP_DEFINE_CATEGORY_3

OUTCOME_CPP_DEFINE_CATEGORY_3(
    sgns ,
    AccountMessenger::Error ,
    e 
)

Source code

#include <thread>
#include <random>
#include <boost/format.hpp>
#include <future>
#include "AccountMessenger.hpp"
#include "base/sgns_version.hpp"
#include "crypto/hasher/hasher_impl.hpp"
#include "primitives/cid/cid.hpp"

OUTCOME_CPP_DEFINE_CATEGORY_3( sgns, AccountMessenger::Error, e )
{
    using AccountCommError = sgns::AccountMessenger::Error;
    switch ( e )
    {
        case AccountCommError::PROTO_DESERIALIZATION:
            return "Error in protobuf data deserialization";
        case AccountCommError::PROTO_SERIALIZATION:
            return "Error in protobuf data serialization";
        case AccountCommError::NONCE_REQUEST_IN_PROGRESS:
            return "Nonce request already in progress";
        case AccountCommError::NONCE_GET_ERROR:
            return "Nonce couldn't be fetched";
        case AccountCommError::NO_RESPONSE_RECEIVED:
            return "No response received from network";
        case AccountCommError::RESPONSE_WITHOUT_NONCE:
            return "Response received but without nonce data";
        case AccountCommError::GENESIS_REQUEST_ERROR:
            return "Genesis request failed";
    }
    return "Unknown error";
}

namespace sgns
{
    std::shared_ptr<AccountMessenger> AccountMessenger::New( std::string                                address,
                                                             std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub,
                                                             InterfaceMethods                           methods )
    {
        if ( address.empty() )
        {
            return nullptr;
        }
        if ( !pubsub )
        {
            return nullptr;
        }
        if ( !methods.get_local_nonce_ || !methods.sign_ || !methods.verify_signature_ )
        {
            return nullptr;
        }
        auto instance = std::shared_ptr<AccountMessenger>(
            new AccountMessenger( std::move( address ), std::move( pubsub ), std::move( methods ) ) );

        instance->subs_acc_future_ = std::move( instance->pubsub_->Subscribe(
            instance->account_comm_topic_,
            [weakptr( std::weak_ptr<AccountMessenger>( instance ) )](
                boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
            {
                if ( auto self = weakptr.lock() )
                {
                    self->logger_->trace( "[{}] Received Account response", self->address_.substr( 0, 8 ) );
                    self->OnResponse( message );
                }
            } ) );
        instance->logger_->debug( "[{}] Subscribed to Account topic {}",
                                  instance->address_.substr( 0, 8 ),
                                  instance->account_comm_topic_ );

        instance->subs_requests_future_ = std::move( instance->pubsub_->Subscribe(
            instance->requests_topic_,
            [weakptr( std::weak_ptr<AccountMessenger>( instance ) )](
                boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
            {
                if ( auto self = weakptr.lock() )
                {
                    self->logger_->debug( "[{}] Received Account request", self->address_.substr( 0, 8 ) );
                    self->OnRequest( message );
                }
            } ) );
        instance->logger_->debug( "[{}] Subscribed to Requests topic {}",
                                  instance->address_.substr( 0, 8 ),
                                  instance->requests_topic_ );

        return instance;
    }

    AccountMessenger::AccountMessenger( std::string                                address,
                                        std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub,
                                        InterfaceMethods                           methods ) :
        address_( std::move( address ) ),
        account_comm_topic_( address_ + std::string( ACCOUNT_COMM ) + sgns::version::GetNetAndVersionAppendix() ),
        requests_topic_( std::string( REQUESTS_COMM ) + sgns::version::GetNetAndVersionAppendix() ),
        pubsub_( std::move( pubsub ) ),
        methods_( std::move( methods ) )
    {
        worker_thread_ = std::thread( &AccountMessenger::WorkerLoop, this );
    }

    AccountMessenger::~AccountMessenger()
    {
        stop_worker_.store( true );
        queue_cv_.notify_one();
        if ( worker_thread_.joinable() )
        {
            worker_thread_.join();
        }
    }

    void AccountMessenger::RegisterBlockResponseHandler( BlockResponseHandler handler )
    {
        std::lock_guard lock( global_handler_mutex_ );
        global_block_handler_ = std::move( handler );
    }

    void AccountMessenger::ClearBlockResponseHandler()
    {
        std::lock_guard lock( global_handler_mutex_ );
        global_block_handler_ = nullptr;
    }

    void AccountMessenger::RegisterHeadRequestHandler( HeadRequestHandler handler )
    {
        std::lock_guard lock( head_handler_mutex_ );
        head_request_handler_ = std::move( handler );
    }

    void AccountMessenger::ClearHeadRequestHandler()
    {
        std::lock_guard lock( head_handler_mutex_ );
        head_request_handler_ = nullptr;
    }

    outcome::result<void> AccountMessenger::RequestHeads( const std::unordered_set<std::string> &topics )
    {
        if ( topics.empty() )
        {
            logger_->debug( "[{}] RequestHeads called with empty topics list", address_.substr( 0, 8 ) );
            return outcome::success();
        }

        accountComm::HeadRequest req;
        req.set_requester_address( address_ );
        req.set_request_id( rd_() );
        req.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );

        for ( const auto &topic : topics )
        {
            req.add_topics( topic );
        }

        accountComm::SignedHeadRequest signed_req;
        signed_req.mutable_data()->CopyFrom( req );

        std::string req_string;
        if ( !req.SerializeToString( &req_string ) )
        {
            logger_->error( "[{}] Failed to serialize HeadRequest", address_.substr( 0, 8 ) );
            return outcome::failure( Error::PROTO_SERIALIZATION );
        }

        auto sign_result = methods_.sign_( std::vector<uint8_t>( req_string.begin(), req_string.end() ) );
        if ( sign_result.has_error() )
        {
            logger_->error( "[{}] Failed to sign HeadRequest", address_.substr( 0, 8 ) );
            return outcome::failure( sign_result.error() );
        }

        signed_req.set_signature( std::string( sign_result.value().begin(), sign_result.value().end() ) );

        accountComm::AccountMessage envelope;
        envelope.mutable_head_request()->CopyFrom( signed_req );

        logger_->debug( "[{}] Sending HeadRequest for {} topics", address_.substr( 0, 8 ), topics.size() );

        return SendAccountMessage( envelope, { requests_topic_ } );
    }

    void AccountMessenger::OnRequest( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
    {
        if ( message )
        {
            logger_->trace( "{}: Valid message received", __func__ );
            accountComm::AccountMessage acc_msg;
            if ( !acc_msg.ParseFromArray( message->data.data(), static_cast<int>( message->data.size() ) ) )
            {
                logger_->error( "{}: Failed to parse AccountMessage ", __func__ );
                return;
            }

            switch ( acc_msg.payload_case() )
            {
                case accountComm::AccountMessage::kNonceRequest:
                    HandleNonceRequest( acc_msg.nonce_request() );
                    break;
                case accountComm::AccountMessage::kBlockRequest:
                    HandleBlockRequest( acc_msg.block_request() );
                    break;
                case accountComm::AccountMessage::kHeadRequest:
                    HandleHeadRequest( acc_msg.head_request() );
                    break;
                case accountComm::AccountMessage::kBlockCidRequest:
                    HandleBlockCidRequest( acc_msg.block_cid_request() );
                    break;
                case accountComm::AccountMessage::kNonceResponse:
                case accountComm::AccountMessage::kBlockResponse:
                    logger_->error( "{}: Unexpected response received ", __func__ );
                    break;
                default:
                    logger_->error( "{}: Unknown AccountMessage type received on {}", __func__ );
                    break;
            }
        }
    }

    void AccountMessenger::OnResponse( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
    {
        if ( message )
        {
            logger_->trace( "{}: Valid message received", __func__ );
            accountComm::AccountMessage acc_msg;
            if ( !acc_msg.ParseFromArray( message->data.data(), static_cast<int>( message->data.size() ) ) )
            {
                logger_->error( "{}: Failed to parse AccountMessage ", __func__ );
                return;
            }

            switch ( acc_msg.payload_case() )
            {
                case accountComm::AccountMessage::kNonceRequest:
                    logger_->error( "{}: Unexpected response received ", __func__ );
                    break;
                case accountComm::AccountMessage::kNonceResponse:
                    HandleNonceResponse( acc_msg.nonce_response() );
                    break;
                case accountComm::AccountMessage::kBlockResponse:
                    HandleBlockResponse( acc_msg.block_response() );
                    break;
                case accountComm::AccountMessage::kBlockCidRequest:
                    logger_->error( "{}: Unexpected response received ", __func__ );
                    break;
                default:
                    logger_->error( "{}: Unknown AccountMessage type received on {}", __func__ );
                    break;
            }
        }
    }

    outcome::result<void> AccountMessenger::RequestNonce( uint64_t req_id )
    {
        accountComm::NonceRequest req;
        req.set_requester_address( address_ );
        req.set_request_id( req_id );
        req.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );

        std::string encoded;
        if ( !req.SerializeToString( &encoded ) )
        {
            return outcome::failure( Error::PROTO_DESERIALIZATION );
        }

        std::vector<uint8_t> serialized_vec( encoded.begin(), encoded.end() );
        OUTCOME_TRY( auto &&signature, methods_.sign_( serialized_vec ) );
        accountComm::SignedNonceRequest signed_req;
        *signed_req.mutable_data() = req;
        signed_req.set_signature( signature.data(), signature.size() );

        accountComm::AccountMessage envelope;
        *envelope.mutable_nonce_request() = signed_req;

        auto send_ret = SendAccountMessage( envelope, { requests_topic_ } );

        return send_ret;
    }

    outcome::result<uint64_t> AccountMessenger::GetLatestNonce( uint64_t timeout_ms, uint64_t silent_time_ms )
    {
        auto promise = std::make_shared<std::promise<outcome::result<uint64_t>>>();
        auto future  = promise->get_future();

        EnqueueTask(
            { RequestType::Nonce, timeout_ms, silent_time_ms, 0, std::string{}, nullptr, std::move( promise ) } );

        return future.get();
    }

    outcome::result<void> AccountMessenger::RequestGenesis(
        uint64_t timeout_ms, std::function<void( outcome::result<std::string> )> callback )
    {
        EnqueueTask( { RequestType::Genesis, timeout_ms, 150, 0, std::string{}, std::move( callback ), nullptr } );
        return outcome::success();
    }

    outcome::result<void> AccountMessenger::RequestRegularBlock( uint64_t                                            timeout_ms,
                                                                 std::string                                         cid,
                                                                 std::function<void( outcome::result<std::string> )> callback )
    {
        EnqueueTask(
            { RequestType::BlockByCid, timeout_ms, 150, 0, std::move( cid ), std::move( callback ), nullptr } );
        return outcome::success();
    }

    outcome::result<void> AccountMessenger::RequestBlock( uint64_t req_id, uint8_t block_index )
    {
        accountComm::BlockRequest req;
        req.set_requester_address( address_ );
        req.set_request_id( req_id );
        req.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );
        req.set_block_index( static_cast<uint32_t>( block_index ) );

        std::string encoded;
        if ( !req.SerializeToString( &encoded ) )
        {
            return outcome::failure( Error::PROTO_SERIALIZATION );
        }

        std::vector<uint8_t> serialized_vec( encoded.begin(), encoded.end() );
        OUTCOME_TRY( auto &&signature, methods_.sign_( serialized_vec ) );

        accountComm::SignedBlockRequest signed_req;
        *signed_req.mutable_data() = req;
        signed_req.set_signature( signature.data(), signature.size() );

        accountComm::AccountMessage envelope;
        *envelope.mutable_block_request() = signed_req;

        return SendAccountMessage( envelope, { requests_topic_ } );
    }

    outcome::result<void> AccountMessenger::RequestBlockByCid( uint64_t req_id, const std::string &cid )
    {
        accountComm::BlockCidRequest req;
        req.set_requester_address( address_ );
        req.set_request_id( req_id );
        req.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );
        req.set_cid( cid );
        logger_->debug( "[{}] Requesting block by CID {} with req_id {}",
                        address_.substr( 0, 8 ),
                        cid, req_id );
        std::string encoded;
        if ( !req.SerializeToString( &encoded ) )
        {
            return outcome::failure( Error::PROTO_SERIALIZATION );
        }

        std::vector<uint8_t> serialized_vec( encoded.begin(), encoded.end() );
        OUTCOME_TRY( auto &&signature, methods_.sign_( serialized_vec ) );

        accountComm::SignedBlockCidRequest signed_req;
        *signed_req.mutable_data() = req;
        signed_req.set_signature( signature.data(), signature.size() );

        accountComm::AccountMessage envelope;
        *envelope.mutable_block_cid_request() = signed_req;

        return SendAccountMessage( envelope, { requests_topic_ } );
    }

    void AccountMessenger::HandleBlockRequest( const accountComm::SignedBlockRequest &signed_req )
    {
        const auto &req = signed_req.data();

        logger_->debug( "[{}] Received a Block request req_id {} index {}",
                        address_.substr( 0, 8 ),
                        req.request_id(),
                        req.block_index() );

        std::string serialized;
        if ( !req.SerializeToString( &serialized ) )
        {
            logger_->error( "Failed to serialize BlockRequest for signature check" );
            return;
        }
        std::vector<uint8_t> serialized_vec( serialized.begin(), serialized.end() );
        auto                 verify_signature_result = methods_.verify_signature_( req.requester_address(),
                                                                   signed_req.signature(),
                                                                   serialized_vec );
        if ( verify_signature_result.has_error() || !verify_signature_result.value() )
        {
            logger_->error( "Invalid signature on BlockRequest from {}", req.requester_address() );
            return;
        }

        auto cid_result = methods_.get_block_cid_( static_cast<uint8_t>( req.block_index() ), req.requester_address() );
        bool have_cid   = !cid_result.has_error();

        if ( !have_cid )
        {
            logger_->debug( "[{}] I don't have the block index {}, will send empty BlockResponse",
                            address_.substr( 0, 8 ),
                            req.block_index() );
        }

        accountComm::BlockResponse resp;
        resp.set_responder_address( address_ );
        resp.set_requester_address( req.requester_address() );
        resp.set_request_id( req.request_id() );
        resp.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );
        logger_->debug( "[{}] Preparing BlockResponse for req_id {} with requester address {}",
                        address_.substr( 0, 8 ),
                        req.request_id(),
                        req.requester_address() );

        if ( have_cid )
        {
            auto *info = resp.add_blocks();
            info->set_cid( cid_result.value() );

            auto peer_info = pubsub_->GetHost()->getPeerInfo();
            info->set_peer_id( std::string( peer_info.id.toVector().begin(), peer_info.id.toVector().end() ) );

            auto pubsubObserved = pubsub_->GetHost()->getObservedAddressesReal();
            for ( auto &addr : pubsubObserved )
            {
                info->add_addresses( addr.getStringAddress() );
                logger_->debug( "Address Broadcast: {}", addr.getStringAddress() );
            }
            for ( auto &addr : peer_info.addresses )
            {
                info->add_addresses( addr.getStringAddress() );
                logger_->debug( "Address Broadcast: {}", addr.getStringAddress() );
            }

            if ( info->addresses_size() == 0 )
            {
                logger_->error( "No addresses found for BlockResponse" );
            }
        }

        std::string resp_serialized;
        if ( !resp.SerializeToString( &resp_serialized ) )
        {
            logger_->error( "Failed to serialize BlockResponse" );
            return;
        }

        std::vector<uint8_t> resp_bytes( resp_serialized.begin(), resp_serialized.end() );

        auto signature_res = methods_.sign_( resp_bytes );
        if ( signature_res.has_error() )
        {
            logger_->error( "Failed to sign BlockResponse" );
            return;
        }

        accountComm::SignedBlockResponse signed_resp;
        *signed_resp.mutable_data() = resp;
        auto signature              = signature_res.value();
        signed_resp.set_signature( signature.data(), signature.size() );

        accountComm::AccountMessage msg;
        *msg.mutable_block_response() = signed_resp;

        auto account_topic = req.requester_address() + std::string( ACCOUNT_COMM ) +
                             sgns::version::GetNetAndVersionAppendix();

        auto send_ret = SendAccountMessage( msg, { account_topic } );
        if ( send_ret.has_error() )
        {
            logger_->error( "[{}] Failed to send BlockResponse for req_id {}",
                            address_.substr( 0, 8 ),
                            req.request_id() );
        }
        else
        {
            logger_->debug( "[{}] Sent BlockResponse ({} block entries) to {} (req_id {})",
                            address_.substr( 0, 8 ),
                            resp.blocks_size(),
                            req.requester_address().substr( 0, 8 ),
                            req.request_id() );
        }
    }

    void AccountMessenger::HandleBlockCidRequest( const accountComm::SignedBlockCidRequest &signed_req )
    {
        const auto &req = signed_req.data();

        logger_->debug( "[{}] Received a Block-by-CID request req_id {} cid {}",
                        address_.substr( 0, 8 ),
                        req.request_id(),
                        req.cid() );

        std::string serialized;
        if ( !req.SerializeToString( &serialized ) )
        {
            logger_->error( "Failed to serialize BlockCidRequest for signature check" );
            return;
        }
        std::vector<uint8_t> serialized_vec( serialized.begin(), serialized.end() );
        auto                 verify_signature_result = methods_.verify_signature_( req.requester_address(),
                                                                   signed_req.signature(),
                                                                   serialized_vec );
        if ( verify_signature_result.has_error() || !verify_signature_result.value() )
        {
            logger_->error( "Invalid signature on BlockCidRequest from {}", req.requester_address() );
            return;
        }

        bool have_cid = false;
        if ( methods_.has_block_cid_ )
        {
            auto has_cid_result = methods_.has_block_cid_( req.cid() );
            have_cid = has_cid_result.has_value() && has_cid_result.value();
        }
        else
        {
            logger_->error( "No has_block_cid_ method configured" );
        }

        accountComm::BlockResponse resp;
        resp.set_responder_address( address_ );
        resp.set_requester_address( req.requester_address() );
        resp.set_request_id( req.request_id() );
        resp.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );
        logger_->debug( "[{}] Preparing BlockResponse for req_id {} with requester address {}",
                        address_.substr( 0, 8 ),
                        req.request_id(),
                        req.requester_address() );

        if ( have_cid )
        {
            auto *info = resp.add_blocks();
            info->set_cid( req.cid() );

            auto peer_info = pubsub_->GetHost()->getPeerInfo();
            info->set_peer_id( std::string( peer_info.id.toVector().begin(), peer_info.id.toVector().end() ) );

            auto pubsubObserved = pubsub_->GetHost()->getObservedAddressesReal();
            for ( auto &addr : pubsubObserved )
            {
                info->add_addresses( addr.getStringAddress() );
                logger_->debug( "Address Broadcast: {}", addr.getStringAddress() );
            }
            for ( auto &addr : peer_info.addresses )
            {
                info->add_addresses( addr.getStringAddress() );
                logger_->debug( "Address Broadcast: {}", addr.getStringAddress() );
            }
        }

        std::string resp_serialized;
        if ( !resp.SerializeToString( &resp_serialized ) )
        {
            logger_->error( "Failed to serialize BlockResponse" );
            return;
        }

        std::vector<uint8_t> resp_bytes( resp_serialized.begin(), resp_serialized.end() );
        auto                 signature_res = methods_.sign_( resp_bytes );
        if ( signature_res.has_error() )
        {
            logger_->error( "Failed to sign BlockResponse" );
            return;
        }

        accountComm::SignedBlockResponse signed_resp;
        *signed_resp.mutable_data() = resp;
        auto signature              = signature_res.value();
        signed_resp.set_signature( signature.data(), signature.size() );

        accountComm::AccountMessage msg;
        *msg.mutable_block_response() = signed_resp;

        auto account_topic = req.requester_address() + std::string( ACCOUNT_COMM ) +
                             sgns::version::GetNetAndVersionAppendix();

        auto send_ret = SendAccountMessage( msg, { account_topic } );
        if ( send_ret.has_error() )
        {
            logger_->error( "[{}] Failed to send BlockResponse for req_id {}",
                            address_.substr( 0, 8 ),
                            req.request_id() );
        }
        else
        {
            logger_->debug( "[{}] Sent BlockResponse ({} block entries) to {} (req_id {})",
                            address_.substr( 0, 8 ),
                            resp.blocks_size(),
                            req.requester_address().substr( 0, 8 ),
                            req.request_id() );
        }
    }

    void AccountMessenger::HandleBlockResponse( const accountComm::SignedBlockResponse &signed_resp )
    {
        const auto &resp = signed_resp.data();

        logger_->debug( "[{}] Received a Block response from {} with {} blocks and req_id {}",
                        address_.substr( 0, 8 ),
                        resp.responder_address().substr( 0, 8 ),
                        resp.blocks_size(),
                        resp.request_id() );

        // Verify signature
        std::string serialized;
        if ( !resp.SerializeToString( &serialized ) )
        {
            logger_->error( "Failed to serialize BlockResponse for signature check" );
            return;
        }

        std::vector<uint8_t> data_vec( serialized.begin(), serialized.end() );
        auto                 verify_signature_result = methods_.verify_signature_( resp.responder_address(),
                                                                   signed_resp.signature(),
                                                                   data_vec );

        if ( verify_signature_result.has_error() )
        {
            logger_->error( "No verify method for BlockResponse" );
            return;
        }
        if ( !verify_signature_result.value() )
        {
            logger_->error( "Invalid signature on BlockResponse from {}", resp.responder_address() );
            return;
        }

        // ------------------------------------------------------------
        // Store response information — even if blocks_size() == 0
        // ------------------------------------------------------------
        bool has_valid_cid = false;
        {
            std::lock_guard lock( block_responses_mutex_ );
            auto           &set_ref = block_responses_[resp.request_id()];

            if ( set_ref.empty() )
            {
                // mark first time we saw any response for this req_id
                block_first_response_time_[resp.request_id()] = std::chrono::steady_clock::now();
            }

            for ( const auto &b : resp.blocks() )
            {
                if ( !b.cid().empty() )
                {
                    set_ref.insert( b.cid() );
                    has_valid_cid = true;
                }
            }

            // even if no cids, we keep the key entry in block_responses_
            // to signal "a response was received but empty"
        }

        if ( !has_valid_cid && resp.blocks_size() == 0 )
        {
            logger_->trace( "[{}] Received empty BlockResponse from {}, marking as 'empty response received'",
                            address_.substr( 0, 8 ),
                            resp.responder_address().substr( 0, 8 ) );
            return; // do not trigger global handler
        }

        // ------------------------------------------------------------
        // Notify global handler only for valid CIDs
        // ------------------------------------------------------------
        std::lock_guard lock( global_handler_mutex_ );
        if ( global_block_handler_ )
        {
            for ( const auto &block_info : resp.blocks() )
            {
                if ( block_info.cid().empty() )
                {
                    continue;
                }

                std::string first_address;
                if ( block_info.addresses_size() > 0 )
                {
                    first_address = block_info.addresses( 0 );
                }

                global_block_handler_( block_info.cid(), block_info.peer_id(), first_address );
            }
        }
    }

    outcome::result<void> AccountMessenger::RequestAccountCreation(
        uint64_t timeout_ms, std::function<void( outcome::result<std::string> )> callback )
    {
        EnqueueTask(
            { RequestType::AccountCreation, timeout_ms, 150, 1, std::string{}, std::move( callback ), nullptr } );
        return outcome::success();
    }

    outcome::result<void> AccountMessenger::SendAccountMessage( const accountComm::AccountMessage &msg,
                                                                const std::set<std::string>       &topics )
    {
        size_t               size = msg.ByteSizeLong();
        std::vector<uint8_t> serialized_proto( size );
        if ( !msg.SerializeToArray( serialized_proto.data(), serialized_proto.size() ) )
        {
            logger_->error( "Failed to serialize AccountMessage for NonceResponse" );
            return outcome::failure( Error::PROTO_SERIALIZATION );
        }
        for ( auto &topic : topics )
        {
            logger_->debug( "Sending account packet to {}", topic );
            pubsub_->Publish( topic, serialized_proto );
        }
        return outcome::success();
    }

    void AccountMessenger::WorkerLoop()
    {
        while ( true )
        {
            RequestTask task;
            {
                std::unique_lock lock( queue_mutex_ );
                queue_cv_.wait( lock, [this]() { return stop_worker_.load() || !request_queue_.empty(); } );
                if ( stop_worker_.load() && request_queue_.empty() )
                {
                    break;
                }
                task = std::move( request_queue_.front() );
                request_queue_.pop();
            }

            switch ( task.type )
            {
                case RequestType::Nonce:
                {
                    auto res = PerformNonceRequest( task.timeout_ms, task.silent_time_ms );
                    if ( task.nonce_promise )
                    {
                        task.nonce_promise->set_value( res );
                    }
                    break;
                }
                case RequestType::Genesis:
                {
                    auto res = PerformBlockRequest( task.timeout_ms, task.block_index );
                    if ( task.callback )
                    {
                        if ( res.has_error() )
                        {
                            task.callback( outcome::failure( res.error() ) );
                        }
                        else
                        {
                            const auto &cids = res.value();
                            if ( cids.empty() )
                            {
                                task.callback( outcome::failure( boost::system::error_code{} ) );
                            }
                            else
                            {
                                for ( const auto &cid : cids )
                                {
                                    task.callback( outcome::success( cid ) );
                                }
                            }
                        }
                    }
                    break;
                }
                case RequestType::AccountCreation:
                {
                    auto res = PerformBlockRequest( task.timeout_ms, task.block_index );
                    if ( task.callback )
                    {
                        if ( res.has_error() )
                        {
                            task.callback( outcome::failure( res.error() ) );
                        }
                        else
                        {
                            const auto &cids = res.value();
                            if ( cids.empty() )
                            {
                                task.callback( outcome::failure( boost::system::error_code{} ) );
                            }
                            else
                            {
                                for ( const auto &cid : cids )
                                {
                                    task.callback( outcome::success( cid ) );
                                }
                            }
                        }
                    }
                    break;
                }
                case RequestType::BlockByCid:
                {
                    auto res = PerformBlockCidRequest( task.timeout_ms, task.cid );
                    if ( task.callback )
                    {
                        if ( res.has_error() )
                        {
                            task.callback( outcome::failure( res.error() ) );
                        }
                        else
                        {
                            const auto &cids = res.value();
                            if ( cids.empty() )
                            {
                                task.callback( outcome::failure( boost::system::error_code{} ) );
                            }
                            else
                            {
                                for ( const auto &cid : cids )
                                {
                                    task.callback( outcome::success( cid ) );
                                }
                            }
                        }
                    }
                    break;
                }
                default:
                    break;
            }
        }
    }

    void AccountMessenger::EnqueueTask( RequestTask task )
    {
        {
            std::lock_guard lock( queue_mutex_ );
            request_queue_.push( std::move( task ) );
        }
        queue_cv_.notify_one();
    }

    outcome::result<uint64_t> AccountMessenger::PerformNonceRequest( uint64_t timeout_ms, uint64_t silent_time_ms )
    {
        std::mt19937_64 gen( rd_() );
        uint64_t        random_value = gen();

        std::string              to_hash = address_ + std::to_string( random_value );
        sgns::crypto::HasherImpl hasher;
        auto                     hash = hasher.sha2_256(
            gsl::span<const uint8_t>( reinterpret_cast<const uint8_t *>( to_hash.data() ), to_hash.size() ) );

        uint64_t req_id = 0;
        std::memcpy( &req_id, hash.data(), sizeof( req_id ) );

        logger_->debug( "[{}] Requesting nonce with timeout {} and req_id {} ",
                        address_.substr( 0, 8 ),
                        timeout_ms,
                        req_id );

        {
            std::lock_guard lock( nonce_responses_mutex_ );
            nonce_responses_.erase( req_id );
            no_nonce_responses_.erase( req_id );
            first_response_time_.erase( req_id );
        }

        OUTCOME_TRY( RequestNonce( req_id ) );

        const auto start_time        = std::chrono::steady_clock::now();
        const auto full_timeout      = std::chrono::milliseconds( timeout_ms );
        const auto silent_time       = std::chrono::milliseconds( silent_time_ms );

        bool first_seen = false;

        while ( true )
        {
            {
                std::lock_guard lock( nonce_responses_mutex_ );
                auto            nonce_it    = nonce_responses_.find( req_id );
                auto            no_nonce_it = no_nonce_responses_.find( req_id );

                bool has_nonce_responses    = ( nonce_it != nonce_responses_.end() && !nonce_it->second.empty() );
                bool has_no_nonce_responses = ( no_nonce_it != no_nonce_responses_.end() &&
                                                !no_nonce_it->second.empty() );

                if ( has_nonce_responses || has_no_nonce_responses )
                {
                    if ( !first_seen )
                    {
                        first_seen                   = true;
                        first_response_time_[req_id] = std::chrono::steady_clock::now();
                    }
                    else
                    {
                        auto elapsed = std::chrono::steady_clock::now() - first_response_time_[req_id];
                        if ( elapsed >= silent_time )
                        {
                            break; // silent window passed
                        }
                    }
                }
            }

            if ( std::chrono::steady_clock::now() - start_time >= full_timeout )
            {
                break; // total timeout reached
            }
            std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
        }

        uint64_t max_nonce        = 0;
        bool     has_any_nonce    = false;
        bool     has_any_response = false;

        {
            std::lock_guard lock( nonce_responses_mutex_ );
            auto            nonce_it    = nonce_responses_.find( req_id );
            auto            no_nonce_it = no_nonce_responses_.find( req_id );

            if ( nonce_it != nonce_responses_.end() && !nonce_it->second.empty() )
            {
                has_any_nonce    = true;
                has_any_response = true;
                max_nonce        = *nonce_it->second.rbegin();
            }

            if ( no_nonce_it != no_nonce_responses_.end() && !no_nonce_it->second.empty() )
            {
                has_any_response = true;
            }

            nonce_responses_.erase( req_id );
            no_nonce_responses_.erase( req_id );
            first_response_time_.erase( req_id );
        }

        if ( !has_any_response )
        {
            logger_->debug( "[{}] No response received within timeout for req_id {}", address_.substr( 0, 8 ), req_id );
            return outcome::failure( Error::NO_RESPONSE_RECEIVED );
        }

        if ( !has_any_nonce )
        {
            logger_->debug( "[{}] Response received but without nonce data for req_id {}",
                            address_.substr( 0, 8 ),
                            req_id );
            return outcome::failure( Error::RESPONSE_WITHOUT_NONCE );
        }

        logger_->debug( "[{}] Returning highest collected nonce for req_id {}: {}",
                        address_.substr( 0, 8 ),
                        req_id,
                        max_nonce );
        return max_nonce;
    }

    outcome::result<std::set<std::string>> AccountMessenger::PerformBlockRequest( uint64_t timeout_ms,
                                                                                  uint8_t  block_index )
    {
        std::mt19937_64 gen( rd_() );
        uint64_t        random_value = gen();

        std::string              to_hash = address_ + std::to_string( random_value );
        sgns::crypto::HasherImpl hasher;
        auto                     hash = hasher.sha2_256(
            gsl::span<const uint8_t>( reinterpret_cast<const uint8_t *>( to_hash.data() ), to_hash.size() ) );

        uint64_t req_id = 0;
        std::memcpy( &req_id, hash.data(), sizeof( req_id ) );

        logger_->debug( "[{}] Requesting block {} with req_id {} and timeout {}",
                        address_.substr( 0, 8 ),
                        block_index,
                        req_id,
                        timeout_ms );

        {
            std::lock_guard lock( block_responses_mutex_ );
            block_responses_.erase( req_id );
            block_first_response_time_.erase( req_id );
        }

        auto request_result = RequestBlock( req_id, block_index );
        if ( request_result.has_error() )
        {
            logger_->error( "[{}] Failed to request block {}", address_.substr( 0, 8 ), block_index );
            return request_result.error();
        }

        const auto start_time        = std::chrono::steady_clock::now();
        const auto full_timeout      = std::chrono::milliseconds( timeout_ms );
        const auto silent_time       = std::chrono::milliseconds( 150 );

        bool first_seen = false;

        while ( true )
        {
            const auto now = std::chrono::steady_clock::now();

            {
                std::lock_guard lock( block_responses_mutex_ );
                auto            it = block_responses_.find( req_id );
                if ( it != block_responses_.end() )
                {
                    if ( !first_seen )
                    {
                        first_seen                         = true;
                        block_first_response_time_[req_id] = std::chrono::steady_clock::now();
                    }
                    else
                    {
                        auto elapsed = std::chrono::steady_clock::now() - block_first_response_time_[req_id];
                        if ( elapsed >= silent_time )
                        {
                            break;
                        }
                    }
                }
            }


            if ( std::chrono::steady_clock::now() - start_time >= full_timeout )
            {
                logger_->debug( "[{}] Timeout: no BlockResponse received for req_id {}",
                                address_.substr( 0, 8 ),
                                req_id );
                return outcome::failure( Error::GENESIS_REQUEST_ERROR );
            }

            std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
        }

        std::set<std::string> cids;
        bool                  any_response = false;
        {
            std::lock_guard lock( block_responses_mutex_ );
            auto            it = block_responses_.find( req_id );
            if ( it != block_responses_.end() )
            {
                any_response = true;
                cids         = it->second;
            }
            block_responses_.erase( req_id );
            block_first_response_time_.erase( req_id );
        }

        if ( !any_response )
        {
            logger_->warn( "[{}] No responses recorded for req_id {}", address_.substr( 0, 8 ), req_id );
            return outcome::failure( Error::GENESIS_REQUEST_ERROR );
        }

        return outcome::success( cids );
    }

    outcome::result<std::set<std::string>> AccountMessenger::PerformBlockCidRequest( uint64_t timeout_ms,
                                                                                      const std::string &cid )
    {
        std::mt19937_64 gen( rd_() );
        uint64_t        random_value = gen();

        std::string              to_hash = address_ + std::to_string( random_value );
        sgns::crypto::HasherImpl hasher;
        auto                     hash = hasher.sha2_256(
            gsl::span<const uint8_t>( reinterpret_cast<const uint8_t *>( to_hash.data() ), to_hash.size() ) );

        uint64_t req_id = 0;
        std::memcpy( &req_id, hash.data(), sizeof( req_id ) );

        logger_->debug( "[{}] Requesting block CID {} with req_id {} and timeout {}",
                        address_.substr( 0, 8 ),
                        cid,
                        req_id,
                        timeout_ms );

        {
            std::lock_guard lock( block_responses_mutex_ );
            block_responses_.erase( req_id );
            block_first_response_time_.erase( req_id );
        }

        auto request_result = RequestBlockByCid( req_id, cid );
        if ( request_result.has_error() )
        {
            logger_->error( "[{}] Failed to request block CID {}",
                            address_.substr( 0, 8 ),
                            cid );
            return request_result.error();
        }

        const auto start_time        = std::chrono::steady_clock::now();
        const auto full_timeout      = std::chrono::milliseconds( timeout_ms );
        const auto silent_time       = std::chrono::milliseconds( 150 );

        bool first_seen = false;

        while ( true )
        {
            const auto now = std::chrono::steady_clock::now();

            {
                std::lock_guard lock( block_responses_mutex_ );
                auto            it = block_responses_.find( req_id );
                if ( it != block_responses_.end() )
                {
                    if ( !first_seen )
                    {
                        first_seen                         = true;
                        block_first_response_time_[req_id] = std::chrono::steady_clock::now();
                    }
                    else
                    {
                        auto elapsed = std::chrono::steady_clock::now() - block_first_response_time_[req_id];
                        if ( elapsed >= silent_time )
                        {
                            break;
                        }
                    }
                }
            }

            if ( std::chrono::steady_clock::now() - start_time >= full_timeout )
            {
                logger_->debug( "[{}] Timeout: no BlockResponse received for req_id {}",
                                address_.substr( 0, 8 ),
                                req_id );
                return outcome::failure( Error::GENESIS_REQUEST_ERROR );
            }

            std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
        }

        std::set<std::string> cids;
        bool                  any_response = false;
        {
            std::lock_guard lock( block_responses_mutex_ );
            auto            it = block_responses_.find( req_id );
            if ( it != block_responses_.end() )
            {
                any_response = true;
                cids         = it->second;
            }
            block_responses_.erase( req_id );
            block_first_response_time_.erase( req_id );
        }

        if ( !any_response )
        {
            logger_->warn( "[{}] No responses recorded for req_id {}", address_.substr( 0, 8 ), req_id );
            return outcome::failure( Error::GENESIS_REQUEST_ERROR );
        }

        return outcome::success( cids );
    }

    void AccountMessenger::HandleNonceRequest( const accountComm::SignedNonceRequest &signed_req )
    {
        const auto &req = signed_req.data();

        logger_->debug( "[{}] Received a Nonce request req_id {}", address_.substr( 0, 8 ), req.request_id() );

        std::string serialized;
        if ( !req.SerializeToString( &serialized ) )
        {
            logger_->error( "Failed to serialize NonceRequest for signature check" );
            return;
        }

        std::vector<uint8_t> serialized_vec( serialized.begin(), serialized.end() );

        auto verify_signature_result = methods_.verify_signature_( req.requester_address(),
                                                                   signed_req.signature(),
                                                                   serialized_vec );
        if ( verify_signature_result.has_error() )
        {
            logger_->error( "No verify method" );
            return;
        }
        if ( !verify_signature_result.value() )
        {
            logger_->error( "Invalid signature on NonceRequest from {}", req.requester_address() );
            return;
        }

        auto local_nonce_result = methods_.get_local_nonce_( req.requester_address() );

        accountComm::NonceResponse resp;
        resp.set_responder_address( address_ );
        resp.set_requester_address( req.requester_address() );
        resp.set_request_id( req.request_id() );
        resp.set_timestamp(
            std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );

        if ( local_nonce_result.has_error() )
        {
            logger_->debug( "[{}] I don't have the nonce for the address {}, responding with has_nonce=false",
                            address_.substr( 0, 8 ),
                            req.requester_address() );

            resp.set_has_nonce( false );
            resp.set_known_nonce( 0 ); // Set to 0 when no nonce available
        }
        else
        {
            uint64_t local_nonce = local_nonce_result.value();
            resp.set_has_nonce( true );
            resp.set_known_nonce( local_nonce );

            logger_->debug( "[{}] Sending back the nonce {} to {} with req_id {}",
                            address_.substr( 0, 8 ),
                            local_nonce,
                            req.requester_address().substr( 0, 8 ),
                            resp.request_id() );
        }

        std::string resp_serialized;
        if ( !resp.SerializeToString( &resp_serialized ) )
        {
            logger_->error( "Failed to serialize NonceResponse" );
            return;
        }

        std::vector<uint8_t> resp_bytes( resp_serialized.begin(), resp_serialized.end() );
        auto                 signature_result = methods_.sign_( resp_bytes );
        if ( signature_result.has_error() )
        {
            logger_->error( "Failed to sign NonceResponse" );
            return;
        }
        auto signature = signature_result.value();

        accountComm::SignedNonceResponse signed_resp;
        *signed_resp.mutable_data() = resp;
        signed_resp.set_signature( signature.data(), signature.size() );

        accountComm::AccountMessage msg;
        *msg.mutable_nonce_response() = signed_resp;
        auto account_topic            = req.requester_address() + std::string( ACCOUNT_COMM ) +
                             sgns::version::GetNetAndVersionAppendix();

        auto send_ret = SendAccountMessage( msg, { account_topic } );

        if ( send_ret.has_error() )
        {
            logger_->error( "Failed to send NonceResponse" );
        }
    }

    void AccountMessenger::HandleNonceResponse( const accountComm::SignedNonceResponse &signed_resp )
    {
        const auto &resp = signed_resp.data();

        logger_->debug( "[{}] Received a Nonce response from {} (has_nonce={}, nonce={}) and req_id {}",
                        address_.substr( 0, 8 ),
                        resp.responder_address(),
                        resp.has_nonce(),
                        resp.known_nonce(),
                        resp.request_id() );

        std::string serialized;
        if ( !resp.SerializeToString( &serialized ) )
        {
            logger_->error( "Failed to serialize NonceResponse for signature check" );
            return;
        }

        std::vector<uint8_t> data_vec( serialized.begin(), serialized.end() );

        auto verify_signature_result = methods_.verify_signature_( resp.responder_address(),
                                                                   signed_resp.signature(),
                                                                   data_vec );
        if ( verify_signature_result.has_error() )
        {
            logger_->error( "No verify method for NonceResponse" );
            return;
        }
        if ( !verify_signature_result.value() )
        {
            logger_->error( "Invalid signature on NonceResponse from {}", resp.responder_address() );
            return;
        }

        std::lock_guard lock( nonce_responses_mutex_ );

        if ( resp.has_nonce() )
        {
            nonce_responses_[resp.request_id()].insert( resp.known_nonce() );
        }
        else
        {
            logger_->debug( "[{}] Responder {} has no nonce for our address",
                            address_.substr( 0, 8 ),
                            resp.responder_address().substr( 0, 8 ) );
            // Track addresses that responded with no nonce
            no_nonce_responses_[resp.request_id()].insert( resp.responder_address() );
        }
    }

    void AccountMessenger::HandleHeadRequest( const accountComm::SignedHeadRequest &signed_req )
    {
        const auto &req = signed_req.data();

        logger_->debug( "[{}] Received a Head request req_id {} for {} topics",
                        address_.substr( 0, 8 ),
                        req.request_id(),
                        req.topics_size() );

        std::string serialized;
        if ( !req.SerializeToString( &serialized ) )
        {
            logger_->error( "Failed to serialize HeadRequest for signature check" );
            return;
        }

        std::vector<uint8_t> serialized_vec( serialized.begin(), serialized.end() );
        auto                 verify_signature_result = methods_.verify_signature_( req.requester_address(),
                                                                   signed_req.signature(),
                                                                   serialized_vec );
        if ( verify_signature_result.has_error() || !verify_signature_result.value() )
        {
            logger_->error( "Invalid signature on HeadRequest from {}", req.requester_address() );
            return;
        }

        // Get topics from request
        std::set<std::string> requested_topics;
        for ( int i = 0; i < req.topics_size(); ++i )
        {
            requested_topics.emplace( req.topics( i ) );
        }

        // Call the registered handler (typically will be handled by CrdtDatastore)
        std::lock_guard lock( head_handler_mutex_ );
        if ( head_request_handler_ )
        {
            logger_->debug( "[{}] Forwarding head request for {} topics to handler",
                            address_.substr( 0, 8 ),
                            requested_topics.size() );
            head_request_handler_( requested_topics );
        }
        else
        {
            logger_->warn( "[{}] No head request handler registered, ignoring HeadRequest", address_.substr( 0, 8 ) );
        }
    }

}

Updated on 2026-03-04 at 13:10:44 -0800