impl/bridge_rpc_watcher.cpp¶
Implementation of the bridge RPC watcher. More...
Namespaces¶
| Name |
|---|
| sgns |
| sgns::evmwatcher |
Detailed Description¶
Implementation of the bridge RPC watcher.
Date: 2026-02-03 SuperGenius ([email protected]) Copyright 2026 Genius Ventures, Inc. SPDX-License-Identifier: MIT
Source code¶
#include <watcher/impl/bridge_rpc_watcher.hpp>
#include <eth/json_rpc.hpp>
#include <eth/eth_receipt_source.hpp>
#include <eth/rpc_receipt_source.hpp>
#include <base/parse_utility.hpp>
#include <base/rlp-logger.hpp>
#include <boost/json.hpp>
#include <boost/json/serialize.hpp>
#include <boost/chrono.hpp>
#include <boost/thread.hpp>
#include <algorithm>
namespace sgns::evmwatcher
{
BridgeRpcWatcher::BridgeRpcWatcher( const Config &config,
MessageCallback message_callback,
BridgeClaimCallback claim_callback ) :
watcher::MessagingWatcher( std::move( message_callback ) ),
config_( config ),
claim_callback_( std::move( claim_callback ) ),
transport_( config.rpc_url )
{
}
void BridgeRpcWatcher::startWatching()
{
auto logger = rlp::base::createLogger( "bridge_rpc_watcher" );
logger->info( "BridgeRpcWatcher starting: rpc={}, chain_id={}, contract={}, event={}",
config_.rpc_url,
config_.chain_id,
config_.contract_address,
config_.event_signature );
watcher::MessagingWatcher::startWatching();
}
void BridgeRpcWatcher::stopWatching()
{
auto logger = rlp::base::createLogger( "bridge_rpc_watcher" );
logger->info( "BridgeRpcWatcher stopping" );
watcher::MessagingWatcher::stopWatching();
}
void BridgeRpcWatcher::watch()
{
while ( running )
{
poll_once();
boost::this_thread::sleep_for( boost::chrono::seconds( config_.poll_interval.count() ) );
}
}
void BridgeRpcWatcher::poll_once()
{
auto logger = rlp::base::createLogger( "bridge_rpc_watcher" );
const auto parsed_contract = rlp::base::parse::hex_bytes( config_.contract_address );
if ( !parsed_contract.has_value() || parsed_contract->size() != 20 )
{
logger->error( "Invalid contract address: {}", config_.contract_address );
return;
}
eth::Address contract{};
std::copy_n( parsed_contract->begin(), 20, contract.begin() );
const auto topic0 = eth::abi::event_signature_hash( config_.event_signature );
// 1. Get latest block
const auto latest_block_req = eth::rpc::make_get_block_by_number_request( eth::rpc::RpcBlockTag::kLatest, 1 );
const auto latest_block_resp = transport_.call( latest_block_req );
if ( !latest_block_resp.has_value() )
{
logger->warn( "Failed to get latest block number" );
return;
}
const auto latest_block = eth::rpc::parse_block_number_response( latest_block_resp.value() );
if ( !latest_block.has_value() )
{
logger->warn( "Failed to parse block number response" );
return;
}
// 2. Ensure safe block range
const uint64_t safe_latest = ( latest_block.value() > config_.confirmation_depth )
? ( latest_block.value() - config_.confirmation_depth )
: 0ULL;
if ( last_block_ == 0 )
{
last_block_ = ( safe_latest > config_.max_log_range ) ? ( safe_latest - config_.max_log_range + 1 ) : 0ULL;
}
if ( last_block_ > safe_latest )
{
return;
}
const auto to_block = std::min( safe_latest, last_block_ + config_.max_log_range - 1 );
// 3. Fetch logs
eth::EventFilter filter;
filter.addresses.push_back( contract );
filter.topics.push_back( topic0 );
const auto get_logs_req = eth::rpc::make_get_logs_request( filter, last_block_, to_block, 2 );
const auto get_logs_resp = transport_.call( get_logs_req );
if ( !get_logs_resp.has_value() )
{
logger->warn( "Failed to fetch logs for blocks {}-{}", last_block_, to_block );
return;
}
const auto logs = eth::rpc::parse_get_logs_response( get_logs_resp.value() );
if ( !logs.has_value() )
{
logger->warn( "Failed to parse logs response for blocks {}-{}", last_block_, to_block );
return;
}
// 4. For each log, fetch receipt and build BridgeEventClaim
for ( const auto &rpc_log : logs.value() )
{
const auto receipt_req = eth::rpc::make_get_transaction_receipt_request( rpc_log.tx_hash, 3 );
const auto receipt_resp = transport_.call( receipt_req );
if ( !receipt_resp.has_value() )
{
logger->warn( "Failed to fetch receipt for tx {}",
rlp::base::parse::hex_array_string( rpc_log.tx_hash ) );
continue;
}
const auto receipt = eth::rpc::parse_transaction_receipt_response( receipt_resp.value() );
if ( !receipt.has_value() )
{
logger->warn( "Failed to parse receipt for tx {}",
rlp::base::parse::hex_array_string( rpc_log.tx_hash ) );
continue;
}
eth::BridgeEventClaim claim;
claim.src_chain_id = config_.chain_id;
claim.dest_chain_id = config_.dest_chain_id;
claim.block_number = receipt->block_number;
claim.block_hash = receipt->block_hash;
claim.tx_hash = receipt->tx_hash;
claim.log_index = rpc_log.log_index;
claim.bridge_contract = contract;
claim.event_topic0 = topic0;
claim.observed_at = static_cast<uint64_t>( std::chrono::system_clock::now().time_since_epoch().count() );
claim.finality_depth = static_cast<uint32_t>( config_.confirmation_depth );
if ( rpc_log.log.topics.size() >= 2 )
{
claim.topics.reserve( rpc_log.log.topics.size() );
for ( const auto &t : rpc_log.log.topics )
{
claim.topics.push_back( t );
}
}
claim.data = rpc_log.log.data;
// Verify the receipt log
const auto verify_result = eth::verify_receipt_log( receipt.value(), claim );
if ( !verify_result )
{
logger->warn( "Receipt log verification failed for tx {}",
rlp::base::parse::hex_array_string( rpc_log.tx_hash ) );
continue;
}
logger->info( "Bridge event detected: tx={} block={} log_index={}",
rlp::base::parse::hex_array_string( rpc_log.tx_hash ),
rpc_log.block_number,
rpc_log.log_index );
if ( claim_callback_ )
{
claim_callback_( claim );
}
if ( messageCallback )
{
boost::json::object msg;
msg["type"] = "bridge_event";
msg["tx_hash"] = rlp::base::parse::hex_array_string( rpc_log.tx_hash );
msg["block_number"] = rpc_log.block_number;
msg["log_index"] = rpc_log.log_index;
messageCallback( boost::json::serialize( msg ) );
}
}
last_block_ = to_block + 1;
}
} // namespace sgns::evmwatcher
Updated on 2026-06-05 at 17:22:20 -0700