src/account/AccountMessenger.cpp¶
Namespaces¶
| Name |
|---|
| sgns |
Functions¶
| Name | |
|---|---|
| OUTCOME_CPP_DEFINE_CATEGORY_3(sgns , AccountMessenger::Error , e ) |
Detailed Description¶
Date: 2025-07-22 Henrique A. Klein ([email protected])
Functions Documentation¶
function OUTCOME_CPP_DEFINE_CATEGORY_3¶
Source code¶
#include <thread>
#include <random>
#include <boost/format.hpp>
#include <future>
#include "AccountMessenger.hpp"
#include "base/sgns_version.hpp"
#include "crypto/hasher/hasher_impl.hpp"
#include "primitives/cid/cid.hpp"
OUTCOME_CPP_DEFINE_CATEGORY_3( sgns, AccountMessenger::Error, e )
{
using AccountCommError = sgns::AccountMessenger::Error;
switch ( e )
{
case AccountCommError::PROTO_DESERIALIZATION:
return "Error in protobuf data deserialization";
case AccountCommError::PROTO_SERIALIZATION:
return "Error in protobuf data serialization";
case AccountCommError::NONCE_REQUEST_IN_PROGRESS:
return "Nonce request already in progress";
case AccountCommError::NONCE_GET_ERROR:
return "Nonce couldn't be fetched";
case AccountCommError::NO_RESPONSE_RECEIVED:
return "No response received from network";
case AccountCommError::RESPONSE_WITHOUT_NONCE:
return "Response received but without nonce data";
case AccountCommError::GENESIS_REQUEST_ERROR:
return "Genesis request failed";
}
return "Unknown error";
}
namespace sgns
{
std::shared_ptr<AccountMessenger> AccountMessenger::New( std::string address,
std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub,
InterfaceMethods methods )
{
if ( address.empty() )
{
return nullptr;
}
if ( !pubsub )
{
return nullptr;
}
if ( !methods.get_local_nonce_ || !methods.sign_ || !methods.verify_signature_ )
{
return nullptr;
}
auto instance = std::shared_ptr<AccountMessenger>(
new AccountMessenger( std::move( address ), std::move( pubsub ), std::move( methods ) ) );
instance->subs_acc_future_ = std::move( instance->pubsub_->Subscribe(
instance->account_comm_topic_,
[weakptr( std::weak_ptr<AccountMessenger>( instance ) )](
boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
{
if ( auto self = weakptr.lock() )
{
self->logger_->trace( "[{}] Received Account response", self->address_.substr( 0, 8 ) );
self->OnResponse( message );
}
} ) );
instance->logger_->debug( "[{}] Subscribed to Account topic {}",
instance->address_.substr( 0, 8 ),
instance->account_comm_topic_ );
instance->subs_requests_future_ = std::move( instance->pubsub_->Subscribe(
instance->requests_topic_,
[weakptr( std::weak_ptr<AccountMessenger>( instance ) )](
boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
{
if ( auto self = weakptr.lock() )
{
self->logger_->debug( "[{}] Received Account request", self->address_.substr( 0, 8 ) );
self->OnRequest( message );
}
} ) );
instance->logger_->debug( "[{}] Subscribed to Requests topic {}",
instance->address_.substr( 0, 8 ),
instance->requests_topic_ );
return instance;
}
AccountMessenger::AccountMessenger( std::string address,
std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub,
InterfaceMethods methods ) :
address_( std::move( address ) ),
account_comm_topic_( address_ + std::string( ACCOUNT_COMM ) + sgns::version::GetNetAndVersionAppendix() ),
requests_topic_( std::string( REQUESTS_COMM ) + sgns::version::GetNetAndVersionAppendix() ),
pubsub_( std::move( pubsub ) ),
methods_( std::move( methods ) )
{
worker_thread_ = std::thread( &AccountMessenger::WorkerLoop, this );
}
AccountMessenger::~AccountMessenger()
{
stop_worker_.store( true );
queue_cv_.notify_one();
if ( worker_thread_.joinable() )
{
worker_thread_.join();
}
}
void AccountMessenger::RegisterBlockResponseHandler( BlockResponseHandler handler )
{
std::lock_guard lock( global_handler_mutex_ );
global_block_handler_ = std::move( handler );
}
void AccountMessenger::ClearBlockResponseHandler()
{
std::lock_guard lock( global_handler_mutex_ );
global_block_handler_ = nullptr;
}
void AccountMessenger::RegisterHeadRequestHandler( HeadRequestHandler handler )
{
std::lock_guard lock( head_handler_mutex_ );
head_request_handler_ = std::move( handler );
}
void AccountMessenger::ClearHeadRequestHandler()
{
std::lock_guard lock( head_handler_mutex_ );
head_request_handler_ = nullptr;
}
outcome::result<void> AccountMessenger::RequestHeads( const std::unordered_set<std::string> &topics )
{
if ( topics.empty() )
{
logger_->debug( "[{}] RequestHeads called with empty topics list", address_.substr( 0, 8 ) );
return outcome::success();
}
accountComm::HeadRequest req;
req.set_requester_address( address_ );
req.set_request_id( rd_() );
req.set_timestamp(
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
.count() );
for ( const auto &topic : topics )
{
req.add_topics( topic );
}
accountComm::SignedHeadRequest signed_req;
signed_req.mutable_data()->CopyFrom( req );
std::string req_string;
if ( !req.SerializeToString( &req_string ) )
{
logger_->error( "[{}] Failed to serialize HeadRequest", address_.substr( 0, 8 ) );
return outcome::failure( Error::PROTO_SERIALIZATION );
}
auto sign_result = methods_.sign_( std::vector<uint8_t>( req_string.begin(), req_string.end() ) );
if ( sign_result.has_error() )
{
logger_->error( "[{}] Failed to sign HeadRequest", address_.substr( 0, 8 ) );
return outcome::failure( sign_result.error() );
}
signed_req.set_signature( std::string( sign_result.value().begin(), sign_result.value().end() ) );
accountComm::AccountMessage envelope;
envelope.mutable_head_request()->CopyFrom( signed_req );
logger_->debug( "[{}] Sending HeadRequest for {} topics", address_.substr( 0, 8 ), topics.size() );
return SendAccountMessage( envelope, { requests_topic_ } );
}
void AccountMessenger::OnRequest( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
{
if ( message )
{
logger_->trace( "{}: Valid message received", __func__ );
accountComm::AccountMessage acc_msg;
if ( !acc_msg.ParseFromArray( message->data.data(), static_cast<int>( message->data.size() ) ) )
{
logger_->error( "{}: Failed to parse AccountMessage ", __func__ );
return;
}
switch ( acc_msg.payload_case() )
{
case accountComm::AccountMessage::kNonceRequest:
HandleNonceRequest( acc_msg.nonce_request() );
break;
case accountComm::AccountMessage::kBlockRequest:
HandleBlockRequest( acc_msg.block_request() );
break;
case accountComm::AccountMessage::kHeadRequest:
HandleHeadRequest( acc_msg.head_request() );
break;
case accountComm::AccountMessage::kBlockCidRequest:
HandleBlockCidRequest( acc_msg.block_cid_request() );
break;
case accountComm::AccountMessage::kNonceResponse:
case accountComm::AccountMessage::kBlockResponse:
logger_->error( "{}: Unexpected response received ", __func__ );
break;
default:
logger_->error( "{}: Unknown AccountMessage type received on {}", __func__ );
break;
}
}
}
void AccountMessenger::OnResponse( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message )
{
if ( message )
{
logger_->trace( "{}: Valid message received", __func__ );
accountComm::AccountMessage acc_msg;
if ( !acc_msg.ParseFromArray( message->data.data(), static_cast<int>( message->data.size() ) ) )
{
logger_->error( "{}: Failed to parse AccountMessage ", __func__ );
return;
}
switch ( acc_msg.payload_case() )
{
case accountComm::AccountMessage::kNonceRequest:
logger_->error( "{}: Unexpected response received ", __func__ );
break;
case accountComm::AccountMessage::kNonceResponse:
HandleNonceResponse( acc_msg.nonce_response() );
break;
case accountComm::AccountMessage::kBlockResponse:
HandleBlockResponse( acc_msg.block_response() );
break;
case accountComm::AccountMessage::kBlockCidRequest:
logger_->error( "{}: Unexpected response received ", __func__ );
break;
default:
logger_->error( "{}: Unknown AccountMessage type received on {}", __func__ );
break;
}
}
}
outcome::result<void> AccountMessenger::RequestNonce( uint64_t req_id )
{
accountComm::NonceRequest req;
req.set_requester_address( address_ );
req.set_request_id( req_id );
req.set_timestamp(
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
.count() );
std::string encoded;
if ( !req.SerializeToString( &encoded ) )
{
return outcome::failure( Error::PROTO_DESERIALIZATION );
}
std::vector<uint8_t> serialized_vec( encoded.begin(), encoded.end() );
OUTCOME_TRY( auto &&signature, methods_.sign_( serialized_vec ) );
accountComm::SignedNonceRequest signed_req;
*signed_req.mutable_data() = req;
signed_req.set_signature( signature.data(), signature.size() );
accountComm::AccountMessage envelope;
*envelope.mutable_nonce_request() = signed_req;
auto send_ret = SendAccountMessage( envelope, { requests_topic_ } );
return send_ret;
}
outcome::result<uint64_t> AccountMessenger::GetLatestNonce( uint64_t timeout_ms, uint64_t silent_time_ms )
{
auto promise = std::make_shared<std::promise<outcome::result<uint64_t>>>();
auto future = promise->get_future();
EnqueueTask(
{ RequestType::Nonce, timeout_ms, silent_time_ms, 0, std::string{}, nullptr, std::move( promise ) } );
return future.get();
}
outcome::result<void> AccountMessenger::RequestGenesis(
uint64_t timeout_ms, std::function<void( outcome::result<std::string> )> callback )
{
EnqueueTask( { RequestType::Genesis, timeout_ms, 150, 0, std::string{}, std::move( callback ), nullptr } );
return outcome::success();
}
outcome::result<void> AccountMessenger::RequestRegularBlock( uint64_t timeout_ms,
std::string cid,
std::function<void( outcome::result<std::string> )> callback )
{
EnqueueTask(
{ RequestType::BlockByCid, timeout_ms, 150, 0, std::move( cid ), std::move( callback ), nullptr } );
return outcome::success();
}
outcome::result<void> AccountMessenger::RequestBlock( uint64_t req_id, uint8_t block_index )
{
accountComm::BlockRequest req;
req.set_requester_address( address_ );
req.set_request_id( req_id );
req.set_timestamp(
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
.count() );
req.set_block_index( static_cast<uint32_t>( block_index ) );
std::string encoded;
if ( !req.SerializeToString( &encoded ) )
{
return outcome::failure( Error::PROTO_SERIALIZATION );
}
std::vector<uint8_t> serialized_vec( encoded.begin(), encoded.end() );
OUTCOME_TRY( auto &&signature, methods_.sign_( serialized_vec ) );
accountComm::SignedBlockRequest signed_req;
*signed_req.mutable_data() = req;
signed_req.set_signature( signature.data(), signature.size() );
accountComm::AccountMessage envelope;
*envelope.mutable_block_request() = signed_req;
return SendAccountMessage( envelope, { requests_topic_ } );
}
outcome::result<void> AccountMessenger::RequestBlockByCid( uint64_t req_id, const std::string &cid )
{
accountComm::BlockCidRequest req;
req.set_requester_address( address_ );
req.set_request_id( req_id );
req.set_timestamp(
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
.count() );
req.set_cid( cid );
logger_->debug( "[{}] Requesting block by CID {} with req_id {}",
address_.substr( 0, 8 ),
cid, req_id );
std::string encoded;
if ( !req.SerializeToString( &encoded ) )
{
return outcome::failure( Error::PROTO_SERIALIZATION );
}
std::vector<uint8_t> serialized_vec( encoded.begin(), encoded.end() );
OUTCOME_TRY( auto &&signature, methods_.sign_( serialized_vec ) );
accountComm::SignedBlockCidRequest signed_req;
*signed_req.mutable_data() = req;
signed_req.set_signature( signature.data(), signature.size() );
accountComm::AccountMessage envelope;
*envelope.mutable_block_cid_request() = signed_req;
return SendAccountMessage( envelope, { requests_topic_ } );
}
void AccountMessenger::HandleBlockRequest( const accountComm::SignedBlockRequest &signed_req )
{
const auto &req = signed_req.data();
logger_->debug( "[{}] Received a Block request req_id {} index {}",
address_.substr( 0, 8 ),
req.request_id(),
req.block_index() );
std::string serialized;
if ( !req.SerializeToString( &serialized ) )
{
logger_->error( "Failed to serialize BlockRequest for signature check" );
return;
}
std::vector<uint8_t> serialized_vec( serialized.begin(), serialized.end() );
auto verify_signature_result = methods_.verify_signature_( req.requester_address(),
signed_req.signature(),
serialized_vec );
if ( verify_signature_result.has_error() || !verify_signature_result.value() )
{
logger_->error( "Invalid signature on BlockRequest from {}", req.requester_address() );
return;
}
auto cid_result = methods_.get_block_cid_( static_cast<uint8_t>( req.block_index() ), req.requester_address() );
bool have_cid = !cid_result.has_error();
if ( !have_cid )
{
logger_->debug( "[{}] I don't have the block index {}, will send empty BlockResponse",
address_.substr( 0, 8 ),
req.block_index() );
}
accountComm::BlockResponse resp;
resp.set_responder_address( address_ );
resp.set_requester_address( req.requester_address() );
resp.set_request_id( req.request_id() );
resp.set_timestamp(
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
.count() );
logger_->debug( "[{}] Preparing BlockResponse for req_id {} with requester address {}",
address_.substr( 0, 8 ),
req.request_id(),
req.requester_address() );
if ( have_cid )
{
auto *info = resp.add_blocks();
info->set_cid( cid_result.value() );
auto peer_info = pubsub_->GetHost()->getPeerInfo();
info->set_peer_id( std::string( peer_info.id.toVector().begin(), peer_info.id.toVector().end() ) );
auto pubsubObserved = pubsub_->GetHost()->getObservedAddressesReal();
for ( auto &addr : pubsubObserved )
{
info->add_addresses( addr.getStringAddress() );
logger_->debug( "Address Broadcast: {}", addr.getStringAddress() );
}
for ( auto &addr : peer_info.addresses )
{
info->add_addresses( addr.getStringAddress() );
logger_->debug( "Address Broadcast: {}", addr.getStringAddress() );
}
if ( info->addresses_size() == 0 )
{
logger_->error( "No addresses found for BlockResponse" );
}
}
std::string resp_serialized;
if ( !resp.SerializeToString( &resp_serialized ) )
{
logger_->error( "Failed to serialize BlockResponse" );
return;
}
std::vector<uint8_t> resp_bytes( resp_serialized.begin(), resp_serialized.end() );
auto signature_res = methods_.sign_( resp_bytes );
if ( signature_res.has_error() )
{
logger_->error( "Failed to sign BlockResponse" );
return;
}
accountComm::SignedBlockResponse signed_resp;
*signed_resp.mutable_data() = resp;
auto signature = signature_res.value();
signed_resp.set_signature( signature.data(), signature.size() );
accountComm::AccountMessage msg;
*msg.mutable_block_response() = signed_resp;
auto account_topic = req.requester_address() + std::string( ACCOUNT_COMM ) +
sgns::version::GetNetAndVersionAppendix();
auto send_ret = SendAccountMessage( msg, { account_topic } );
if ( send_ret.has_error() )
{
logger_->error( "[{}] Failed to send BlockResponse for req_id {}",
address_.substr( 0, 8 ),
req.request_id() );
}
else
{
logger_->debug( "[{}] Sent BlockResponse ({} block entries) to {} (req_id {})",
address_.substr( 0, 8 ),
resp.blocks_size(),
req.requester_address().substr( 0, 8 ),
req.request_id() );
}
}
void AccountMessenger::HandleBlockCidRequest( const accountComm::SignedBlockCidRequest &signed_req )
{
const auto &req = signed_req.data();
logger_->debug( "[{}] Received a Block-by-CID request req_id {} cid {}",
address_.substr( 0, 8 ),
req.request_id(),
req.cid() );
std::string serialized;
if ( !req.SerializeToString( &serialized ) )
{
logger_->error( "Failed to serialize BlockCidRequest for signature check" );
return;
}
std::vector<uint8_t> serialized_vec( serialized.begin(), serialized.end() );
auto verify_signature_result = methods_.verify_signature_( req.requester_address(),
signed_req.signature(),
serialized_vec );
if ( verify_signature_result.has_error() || !verify_signature_result.value() )
{
logger_->error( "Invalid signature on BlockCidRequest from {}", req.requester_address() );
return;
}
bool have_cid = false;
if ( methods_.has_block_cid_ )
{
auto has_cid_result = methods_.has_block_cid_( req.cid() );
have_cid = has_cid_result.has_value() && has_cid_result.value();
}
else
{
logger_->error( "No has_block_cid_ method configured" );
}
accountComm::BlockResponse resp;
resp.set_responder_address( address_ );
resp.set_requester_address( req.requester_address() );
resp.set_request_id( req.request_id() );
resp.set_timestamp(
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
.count() );
logger_->debug( "[{}] Preparing BlockResponse for req_id {} with requester address {}",
address_.substr( 0, 8 ),
req.request_id(),
req.requester_address() );
if ( have_cid )
{
auto *info = resp.add_blocks();
info->set_cid( req.cid() );
auto peer_info = pubsub_->GetHost()->getPeerInfo();
info->set_peer_id( std::string( peer_info.id.toVector().begin(), peer_info.id.toVector().end() ) );
auto pubsubObserved = pubsub_->GetHost()->getObservedAddressesReal();
for ( auto &addr : pubsubObserved )
{
info->add_addresses( addr.getStringAddress() );
logger_->debug( "Address Broadcast: {}", addr.getStringAddress() );
}
for ( auto &addr : peer_info.addresses )
{
info->add_addresses( addr.getStringAddress() );
logger_->debug( "Address Broadcast: {}", addr.getStringAddress() );
}
}
std::string resp_serialized;
if ( !resp.SerializeToString( &resp_serialized ) )
{
logger_->error( "Failed to serialize BlockResponse" );
return;
}
std::vector<uint8_t> resp_bytes( resp_serialized.begin(), resp_serialized.end() );
auto signature_res = methods_.sign_( resp_bytes );
if ( signature_res.has_error() )
{
logger_->error( "Failed to sign BlockResponse" );
return;
}
accountComm::SignedBlockResponse signed_resp;
*signed_resp.mutable_data() = resp;
auto signature = signature_res.value();
signed_resp.set_signature( signature.data(), signature.size() );
accountComm::AccountMessage msg;
*msg.mutable_block_response() = signed_resp;
auto account_topic = req.requester_address() + std::string( ACCOUNT_COMM ) +
sgns::version::GetNetAndVersionAppendix();
auto send_ret = SendAccountMessage( msg, { account_topic } );
if ( send_ret.has_error() )
{
logger_->error( "[{}] Failed to send BlockResponse for req_id {}",
address_.substr( 0, 8 ),
req.request_id() );
}
else
{
logger_->debug( "[{}] Sent BlockResponse ({} block entries) to {} (req_id {})",
address_.substr( 0, 8 ),
resp.blocks_size(),
req.requester_address().substr( 0, 8 ),
req.request_id() );
}
}
void AccountMessenger::HandleBlockResponse( const accountComm::SignedBlockResponse &signed_resp )
{
const auto &resp = signed_resp.data();
logger_->debug( "[{}] Received a Block response from {} with {} blocks and req_id {}",
address_.substr( 0, 8 ),
resp.responder_address().substr( 0, 8 ),
resp.blocks_size(),
resp.request_id() );
// Verify signature
std::string serialized;
if ( !resp.SerializeToString( &serialized ) )
{
logger_->error( "Failed to serialize BlockResponse for signature check" );
return;
}
std::vector<uint8_t> data_vec( serialized.begin(), serialized.end() );
auto verify_signature_result = methods_.verify_signature_( resp.responder_address(),
signed_resp.signature(),
data_vec );
if ( verify_signature_result.has_error() )
{
logger_->error( "No verify method for BlockResponse" );
return;
}
if ( !verify_signature_result.value() )
{
logger_->error( "Invalid signature on BlockResponse from {}", resp.responder_address() );
return;
}
// ------------------------------------------------------------
// Store response information — even if blocks_size() == 0
// ------------------------------------------------------------
bool has_valid_cid = false;
{
std::lock_guard lock( block_responses_mutex_ );
auto &set_ref = block_responses_[resp.request_id()];
if ( set_ref.empty() )
{
// mark first time we saw any response for this req_id
block_first_response_time_[resp.request_id()] = std::chrono::steady_clock::now();
}
for ( const auto &b : resp.blocks() )
{
if ( !b.cid().empty() )
{
set_ref.insert( b.cid() );
has_valid_cid = true;
}
}
// even if no cids, we keep the key entry in block_responses_
// to signal "a response was received but empty"
}
if ( !has_valid_cid && resp.blocks_size() == 0 )
{
logger_->trace( "[{}] Received empty BlockResponse from {}, marking as 'empty response received'",
address_.substr( 0, 8 ),
resp.responder_address().substr( 0, 8 ) );
return; // do not trigger global handler
}
// ------------------------------------------------------------
// Notify global handler only for valid CIDs
// ------------------------------------------------------------
std::lock_guard lock( global_handler_mutex_ );
if ( global_block_handler_ )
{
for ( const auto &block_info : resp.blocks() )
{
if ( block_info.cid().empty() )
{
continue;
}
std::string first_address;
if ( block_info.addresses_size() > 0 )
{
first_address = block_info.addresses( 0 );
}
global_block_handler_( block_info.cid(), block_info.peer_id(), first_address );
}
}
}
outcome::result<void> AccountMessenger::RequestAccountCreation(
uint64_t timeout_ms, std::function<void( outcome::result<std::string> )> callback )
{
EnqueueTask(
{ RequestType::AccountCreation, timeout_ms, 150, 1, std::string{}, std::move( callback ), nullptr } );
return outcome::success();
}
outcome::result<void> AccountMessenger::SendAccountMessage( const accountComm::AccountMessage &msg,
const std::set<std::string> &topics )
{
size_t size = msg.ByteSizeLong();
std::vector<uint8_t> serialized_proto( size );
if ( !msg.SerializeToArray( serialized_proto.data(), serialized_proto.size() ) )
{
logger_->error( "Failed to serialize AccountMessage for NonceResponse" );
return outcome::failure( Error::PROTO_SERIALIZATION );
}
for ( auto &topic : topics )
{
logger_->debug( "Sending account packet to {}", topic );
pubsub_->Publish( topic, serialized_proto );
}
return outcome::success();
}
void AccountMessenger::WorkerLoop()
{
while ( true )
{
RequestTask task;
{
std::unique_lock lock( queue_mutex_ );
queue_cv_.wait( lock, [this]() { return stop_worker_.load() || !request_queue_.empty(); } );
if ( stop_worker_.load() && request_queue_.empty() )
{
break;
}
task = std::move( request_queue_.front() );
request_queue_.pop();
}
switch ( task.type )
{
case RequestType::Nonce:
{
auto res = PerformNonceRequest( task.timeout_ms, task.silent_time_ms );
if ( task.nonce_promise )
{
task.nonce_promise->set_value( res );
}
break;
}
case RequestType::Genesis:
{
auto res = PerformBlockRequest( task.timeout_ms, task.block_index );
if ( task.callback )
{
if ( res.has_error() )
{
task.callback( outcome::failure( res.error() ) );
}
else
{
const auto &cids = res.value();
if ( cids.empty() )
{
task.callback( outcome::failure( boost::system::error_code{} ) );
}
else
{
for ( const auto &cid : cids )
{
task.callback( outcome::success( cid ) );
}
}
}
}
break;
}
case RequestType::AccountCreation:
{
auto res = PerformBlockRequest( task.timeout_ms, task.block_index );
if ( task.callback )
{
if ( res.has_error() )
{
task.callback( outcome::failure( res.error() ) );
}
else
{
const auto &cids = res.value();
if ( cids.empty() )
{
task.callback( outcome::failure( boost::system::error_code{} ) );
}
else
{
for ( const auto &cid : cids )
{
task.callback( outcome::success( cid ) );
}
}
}
}
break;
}
case RequestType::BlockByCid:
{
auto res = PerformBlockCidRequest( task.timeout_ms, task.cid );
if ( task.callback )
{
if ( res.has_error() )
{
task.callback( outcome::failure( res.error() ) );
}
else
{
const auto &cids = res.value();
if ( cids.empty() )
{
task.callback( outcome::failure( boost::system::error_code{} ) );
}
else
{
for ( const auto &cid : cids )
{
task.callback( outcome::success( cid ) );
}
}
}
}
break;
}
default:
break;
}
}
}
void AccountMessenger::EnqueueTask( RequestTask task )
{
{
std::lock_guard lock( queue_mutex_ );
request_queue_.push( std::move( task ) );
}
queue_cv_.notify_one();
}
outcome::result<uint64_t> AccountMessenger::PerformNonceRequest( uint64_t timeout_ms, uint64_t silent_time_ms )
{
std::mt19937_64 gen( rd_() );
uint64_t random_value = gen();
std::string to_hash = address_ + std::to_string( random_value );
sgns::crypto::HasherImpl hasher;
auto hash = hasher.sha2_256(
gsl::span<const uint8_t>( reinterpret_cast<const uint8_t *>( to_hash.data() ), to_hash.size() ) );
uint64_t req_id = 0;
std::memcpy( &req_id, hash.data(), sizeof( req_id ) );
logger_->debug( "[{}] Requesting nonce with timeout {} and req_id {} ",
address_.substr( 0, 8 ),
timeout_ms,
req_id );
{
std::lock_guard lock( nonce_responses_mutex_ );
nonce_responses_.erase( req_id );
no_nonce_responses_.erase( req_id );
first_response_time_.erase( req_id );
}
OUTCOME_TRY( RequestNonce( req_id ) );
const auto start_time = std::chrono::steady_clock::now();
const auto full_timeout = std::chrono::milliseconds( timeout_ms );
const auto silent_time = std::chrono::milliseconds( silent_time_ms );
bool first_seen = false;
while ( true )
{
{
std::lock_guard lock( nonce_responses_mutex_ );
auto nonce_it = nonce_responses_.find( req_id );
auto no_nonce_it = no_nonce_responses_.find( req_id );
bool has_nonce_responses = ( nonce_it != nonce_responses_.end() && !nonce_it->second.empty() );
bool has_no_nonce_responses = ( no_nonce_it != no_nonce_responses_.end() &&
!no_nonce_it->second.empty() );
if ( has_nonce_responses || has_no_nonce_responses )
{
if ( !first_seen )
{
first_seen = true;
first_response_time_[req_id] = std::chrono::steady_clock::now();
}
else
{
auto elapsed = std::chrono::steady_clock::now() - first_response_time_[req_id];
if ( elapsed >= silent_time )
{
break; // silent window passed
}
}
}
}
if ( std::chrono::steady_clock::now() - start_time >= full_timeout )
{
break; // total timeout reached
}
std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
}
uint64_t max_nonce = 0;
bool has_any_nonce = false;
bool has_any_response = false;
{
std::lock_guard lock( nonce_responses_mutex_ );
auto nonce_it = nonce_responses_.find( req_id );
auto no_nonce_it = no_nonce_responses_.find( req_id );
if ( nonce_it != nonce_responses_.end() && !nonce_it->second.empty() )
{
has_any_nonce = true;
has_any_response = true;
max_nonce = *nonce_it->second.rbegin();
}
if ( no_nonce_it != no_nonce_responses_.end() && !no_nonce_it->second.empty() )
{
has_any_response = true;
}
nonce_responses_.erase( req_id );
no_nonce_responses_.erase( req_id );
first_response_time_.erase( req_id );
}
if ( !has_any_response )
{
logger_->debug( "[{}] No response received within timeout for req_id {}", address_.substr( 0, 8 ), req_id );
return outcome::failure( Error::NO_RESPONSE_RECEIVED );
}
if ( !has_any_nonce )
{
logger_->debug( "[{}] Response received but without nonce data for req_id {}",
address_.substr( 0, 8 ),
req_id );
return outcome::failure( Error::RESPONSE_WITHOUT_NONCE );
}
logger_->debug( "[{}] Returning highest collected nonce for req_id {}: {}",
address_.substr( 0, 8 ),
req_id,
max_nonce );
return max_nonce;
}
outcome::result<std::set<std::string>> AccountMessenger::PerformBlockRequest( uint64_t timeout_ms,
uint8_t block_index )
{
std::mt19937_64 gen( rd_() );
uint64_t random_value = gen();
std::string to_hash = address_ + std::to_string( random_value );
sgns::crypto::HasherImpl hasher;
auto hash = hasher.sha2_256(
gsl::span<const uint8_t>( reinterpret_cast<const uint8_t *>( to_hash.data() ), to_hash.size() ) );
uint64_t req_id = 0;
std::memcpy( &req_id, hash.data(), sizeof( req_id ) );
logger_->debug( "[{}] Requesting block {} with req_id {} and timeout {}",
address_.substr( 0, 8 ),
block_index,
req_id,
timeout_ms );
{
std::lock_guard lock( block_responses_mutex_ );
block_responses_.erase( req_id );
block_first_response_time_.erase( req_id );
}
auto request_result = RequestBlock( req_id, block_index );
if ( request_result.has_error() )
{
logger_->error( "[{}] Failed to request block {}", address_.substr( 0, 8 ), block_index );
return request_result.error();
}
const auto start_time = std::chrono::steady_clock::now();
const auto full_timeout = std::chrono::milliseconds( timeout_ms );
const auto silent_time = std::chrono::milliseconds( 150 );
bool first_seen = false;
while ( true )
{
const auto now = std::chrono::steady_clock::now();
{
std::lock_guard lock( block_responses_mutex_ );
auto it = block_responses_.find( req_id );
if ( it != block_responses_.end() )
{
if ( !first_seen )
{
first_seen = true;
block_first_response_time_[req_id] = std::chrono::steady_clock::now();
}
else
{
auto elapsed = std::chrono::steady_clock::now() - block_first_response_time_[req_id];
if ( elapsed >= silent_time )
{
break;
}
}
}
}
if ( std::chrono::steady_clock::now() - start_time >= full_timeout )
{
logger_->debug( "[{}] Timeout: no BlockResponse received for req_id {}",
address_.substr( 0, 8 ),
req_id );
return outcome::failure( Error::GENESIS_REQUEST_ERROR );
}
std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
}
std::set<std::string> cids;
bool any_response = false;
{
std::lock_guard lock( block_responses_mutex_ );
auto it = block_responses_.find( req_id );
if ( it != block_responses_.end() )
{
any_response = true;
cids = it->second;
}
block_responses_.erase( req_id );
block_first_response_time_.erase( req_id );
}
if ( !any_response )
{
logger_->warn( "[{}] No responses recorded for req_id {}", address_.substr( 0, 8 ), req_id );
return outcome::failure( Error::GENESIS_REQUEST_ERROR );
}
return outcome::success( cids );
}
outcome::result<std::set<std::string>> AccountMessenger::PerformBlockCidRequest( uint64_t timeout_ms,
const std::string &cid )
{
std::mt19937_64 gen( rd_() );
uint64_t random_value = gen();
std::string to_hash = address_ + std::to_string( random_value );
sgns::crypto::HasherImpl hasher;
auto hash = hasher.sha2_256(
gsl::span<const uint8_t>( reinterpret_cast<const uint8_t *>( to_hash.data() ), to_hash.size() ) );
uint64_t req_id = 0;
std::memcpy( &req_id, hash.data(), sizeof( req_id ) );
logger_->debug( "[{}] Requesting block CID {} with req_id {} and timeout {}",
address_.substr( 0, 8 ),
cid,
req_id,
timeout_ms );
{
std::lock_guard lock( block_responses_mutex_ );
block_responses_.erase( req_id );
block_first_response_time_.erase( req_id );
}
auto request_result = RequestBlockByCid( req_id, cid );
if ( request_result.has_error() )
{
logger_->error( "[{}] Failed to request block CID {}",
address_.substr( 0, 8 ),
cid );
return request_result.error();
}
const auto start_time = std::chrono::steady_clock::now();
const auto full_timeout = std::chrono::milliseconds( timeout_ms );
const auto silent_time = std::chrono::milliseconds( 150 );
bool first_seen = false;
while ( true )
{
const auto now = std::chrono::steady_clock::now();
{
std::lock_guard lock( block_responses_mutex_ );
auto it = block_responses_.find( req_id );
if ( it != block_responses_.end() )
{
if ( !first_seen )
{
first_seen = true;
block_first_response_time_[req_id] = std::chrono::steady_clock::now();
}
else
{
auto elapsed = std::chrono::steady_clock::now() - block_first_response_time_[req_id];
if ( elapsed >= silent_time )
{
break;
}
}
}
}
if ( std::chrono::steady_clock::now() - start_time >= full_timeout )
{
logger_->debug( "[{}] Timeout: no BlockResponse received for req_id {}",
address_.substr( 0, 8 ),
req_id );
return outcome::failure( Error::GENESIS_REQUEST_ERROR );
}
std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
}
std::set<std::string> cids;
bool any_response = false;
{
std::lock_guard lock( block_responses_mutex_ );
auto it = block_responses_.find( req_id );
if ( it != block_responses_.end() )
{
any_response = true;
cids = it->second;
}
block_responses_.erase( req_id );
block_first_response_time_.erase( req_id );
}
if ( !any_response )
{
logger_->warn( "[{}] No responses recorded for req_id {}", address_.substr( 0, 8 ), req_id );
return outcome::failure( Error::GENESIS_REQUEST_ERROR );
}
return outcome::success( cids );
}
void AccountMessenger::HandleNonceRequest( const accountComm::SignedNonceRequest &signed_req )
{
const auto &req = signed_req.data();
logger_->debug( "[{}] Received a Nonce request req_id {}", address_.substr( 0, 8 ), req.request_id() );
std::string serialized;
if ( !req.SerializeToString( &serialized ) )
{
logger_->error( "Failed to serialize NonceRequest for signature check" );
return;
}
std::vector<uint8_t> serialized_vec( serialized.begin(), serialized.end() );
auto verify_signature_result = methods_.verify_signature_( req.requester_address(),
signed_req.signature(),
serialized_vec );
if ( verify_signature_result.has_error() )
{
logger_->error( "No verify method" );
return;
}
if ( !verify_signature_result.value() )
{
logger_->error( "Invalid signature on NonceRequest from {}", req.requester_address() );
return;
}
auto local_nonce_result = methods_.get_local_nonce_( req.requester_address() );
accountComm::NonceResponse resp;
resp.set_responder_address( address_ );
resp.set_requester_address( req.requester_address() );
resp.set_request_id( req.request_id() );
resp.set_timestamp(
std::chrono::duration_cast<std::chrono::milliseconds>( std::chrono::system_clock::now().time_since_epoch() )
.count() );
if ( local_nonce_result.has_error() )
{
logger_->debug( "[{}] I don't have the nonce for the address {}, responding with has_nonce=false",
address_.substr( 0, 8 ),
req.requester_address() );
resp.set_has_nonce( false );
resp.set_known_nonce( 0 ); // Set to 0 when no nonce available
}
else
{
uint64_t local_nonce = local_nonce_result.value();
resp.set_has_nonce( true );
resp.set_known_nonce( local_nonce );
logger_->debug( "[{}] Sending back the nonce {} to {} with req_id {}",
address_.substr( 0, 8 ),
local_nonce,
req.requester_address().substr( 0, 8 ),
resp.request_id() );
}
std::string resp_serialized;
if ( !resp.SerializeToString( &resp_serialized ) )
{
logger_->error( "Failed to serialize NonceResponse" );
return;
}
std::vector<uint8_t> resp_bytes( resp_serialized.begin(), resp_serialized.end() );
auto signature_result = methods_.sign_( resp_bytes );
if ( signature_result.has_error() )
{
logger_->error( "Failed to sign NonceResponse" );
return;
}
auto signature = signature_result.value();
accountComm::SignedNonceResponse signed_resp;
*signed_resp.mutable_data() = resp;
signed_resp.set_signature( signature.data(), signature.size() );
accountComm::AccountMessage msg;
*msg.mutable_nonce_response() = signed_resp;
auto account_topic = req.requester_address() + std::string( ACCOUNT_COMM ) +
sgns::version::GetNetAndVersionAppendix();
auto send_ret = SendAccountMessage( msg, { account_topic } );
if ( send_ret.has_error() )
{
logger_->error( "Failed to send NonceResponse" );
}
}
void AccountMessenger::HandleNonceResponse( const accountComm::SignedNonceResponse &signed_resp )
{
const auto &resp = signed_resp.data();
logger_->debug( "[{}] Received a Nonce response from {} (has_nonce={}, nonce={}) and req_id {}",
address_.substr( 0, 8 ),
resp.responder_address(),
resp.has_nonce(),
resp.known_nonce(),
resp.request_id() );
std::string serialized;
if ( !resp.SerializeToString( &serialized ) )
{
logger_->error( "Failed to serialize NonceResponse for signature check" );
return;
}
std::vector<uint8_t> data_vec( serialized.begin(), serialized.end() );
auto verify_signature_result = methods_.verify_signature_( resp.responder_address(),
signed_resp.signature(),
data_vec );
if ( verify_signature_result.has_error() )
{
logger_->error( "No verify method for NonceResponse" );
return;
}
if ( !verify_signature_result.value() )
{
logger_->error( "Invalid signature on NonceResponse from {}", resp.responder_address() );
return;
}
std::lock_guard lock( nonce_responses_mutex_ );
if ( resp.has_nonce() )
{
nonce_responses_[resp.request_id()].insert( resp.known_nonce() );
}
else
{
logger_->debug( "[{}] Responder {} has no nonce for our address",
address_.substr( 0, 8 ),
resp.responder_address().substr( 0, 8 ) );
// Track addresses that responded with no nonce
no_nonce_responses_[resp.request_id()].insert( resp.responder_address() );
}
}
void AccountMessenger::HandleHeadRequest( const accountComm::SignedHeadRequest &signed_req )
{
const auto &req = signed_req.data();
logger_->debug( "[{}] Received a Head request req_id {} for {} topics",
address_.substr( 0, 8 ),
req.request_id(),
req.topics_size() );
std::string serialized;
if ( !req.SerializeToString( &serialized ) )
{
logger_->error( "Failed to serialize HeadRequest for signature check" );
return;
}
std::vector<uint8_t> serialized_vec( serialized.begin(), serialized.end() );
auto verify_signature_result = methods_.verify_signature_( req.requester_address(),
signed_req.signature(),
serialized_vec );
if ( verify_signature_result.has_error() || !verify_signature_result.value() )
{
logger_->error( "Invalid signature on HeadRequest from {}", req.requester_address() );
return;
}
// Get topics from request
std::set<std::string> requested_topics;
for ( int i = 0; i < req.topics_size(); ++i )
{
requested_topics.emplace( req.topics( i ) );
}
// Call the registered handler (typically will be handled by CrdtDatastore)
std::lock_guard lock( head_handler_mutex_ );
if ( head_request_handler_ )
{
logger_->debug( "[{}] Forwarding head request for {} topics to handler",
address_.substr( 0, 8 ),
requested_topics.size() );
head_request_handler_( requested_topics );
}
else
{
logger_->warn( "[{}] No head request handler registered, ignoring HeadRequest", address_.substr( 0, 8 ) );
}
}
}
Updated on 2026-03-04 at 13:10:44 -0800