Skip to content

eth/eth_watch_service.cpp

Namespaces

Name
eth

Functions

Name
std::shared_ptr< discv4::WatcherPool > make_eth_watcher_pool(const EthWatchConnectionConfig & config)
Create the shared watcher connection pool used by chain peer dialers.
std::shared_ptr< discv4::DialScheduler > start_eth_watch_chain_peer_dialing(boost::asio::io_context & io, std::shared_ptr< discv4::WatcherPool > pool, discv4::DialFn dial_fn, const std::vector< discv4::ValidatedPeer > & peers)
Create a per-chain dial scheduler and enqueue the provided peer candidates.

Functions Documentation

function make_eth_watcher_pool

std::shared_ptr< discv4::WatcherPool > make_eth_watcher_pool(
    const EthWatchConnectionConfig & config
)

Create the shared watcher connection pool used by chain peer dialers.

Parameters:

  • config Pool limits.

Return: Shared watcher pool.

function start_eth_watch_chain_peer_dialing

std::shared_ptr< discv4::DialScheduler > start_eth_watch_chain_peer_dialing(
    boost::asio::io_context & io,
    std::shared_ptr< discv4::WatcherPool > pool,
    discv4::DialFn dial_fn,
    const std::vector< discv4::ValidatedPeer > & peers
)

Create a per-chain dial scheduler and enqueue the provided peer candidates.

Parameters:

  • io Boost.Asio context used by the scheduler.
  • pool Shared watcher pool.
  • dial_fn Callback used for each dial attempt.
  • peers Validated RLPx/ETH peer candidates from chain_enodes.json.nodes.

Return: Scheduler that owns the dial queue and active sessions.

Source code

// Copyright 2026 Genius Ventures, Inc.
// SPDX-License-Identifier: MIT

#include <eth/eth_watch_service.hpp>
#include <eth/eth_handshake.hpp>
#include <eth/eth_watch_dialer.hpp>
#include <eth/eth_watch_runner.hpp>
#include <eth/messages.hpp>
#include <base/rlp-logger.hpp>
#include <discv5/discv5_enr.hpp>
#include <discv5/enr_tree.hpp>
#include <rlpx/crypto/ecdh.hpp>
#include <rlpx/rlpx_error.hpp>
#include <rlpx/rlpx_session.hpp>

#include <algorithm>
#include <memory>
#include <optional>

namespace eth {

namespace
{

bool allows_enr_tree_discovery(const discv4::ChainPeerConfig& chain) noexcept
{
    return chain.discovery_default == discv4::ChainDiscoveryDefault::kAuto
        || chain.discovery_default == discv4::ChainDiscoveryDefault::kEnrTree;
}

bool allows_cache_enr_discv5_discovery(const discv4::ChainPeerConfig& chain) noexcept
{
    return chain.discovery_default == discv4::ChainDiscoveryDefault::kAuto
        || chain.discovery_default == discv4::ChainDiscoveryDefault::kCacheEnrDiscv5;
}

bool allows_discv4_discovery(const discv4::ChainPeerConfig& chain) noexcept
{
    return chain.discovery_default == discv4::ChainDiscoveryDefault::kAuto
        || chain.discovery_default == discv4::ChainDiscoveryDefault::kDiscv4
        || chain.discovery_default == discv4::ChainDiscoveryDefault::kEnrTree;
}

bool chain_config_matches_discovery_mode(
    const discv4::ChainPeerConfig& chain,
    EthWatchDiscoveryMode          mode,
    bool                           enable_discv4_fallback,
    bool                           enable_enr_tree_discovery) noexcept
{
    const bool has_cached_nodes = !chain.nodes.empty();
    const bool has_bootnodes = !chain.bootnodes.empty();
    const bool has_enr_tree = enable_enr_tree_discovery
        && allows_enr_tree_discovery(chain)
        && !chain.enr_trees.empty();
    const bool has_discv5_bootnodes = enable_enr_tree_discovery
        && allows_cache_enr_discv5_discovery(chain)
        && !chain.discv5_bootnodes.empty();
    const bool has_discovery_source =
        has_enr_tree
        || has_discv5_bootnodes
        || (enable_discv4_fallback && allows_discv4_discovery(chain) && has_bootnodes);

    switch (mode)
    {
    case EthWatchDiscoveryMode::kCacheOnly:
        return has_cached_nodes;
    case EthWatchDiscoveryMode::kDiscoverFirst:
        return has_discovery_source;
    case EthWatchDiscoveryMode::kDiscoverIfNeeded:
    case EthWatchDiscoveryMode::kHybrid:
        return has_cached_nodes || has_discovery_source;
    }

    return false;
}

bool should_preload_cached_peers(EthWatchDiscoveryMode mode) noexcept
{
    return mode != EthWatchDiscoveryMode::kDiscoverFirst;
}

bool should_start_discv4_discovery(
    const EthPeerQueue&            queue,
    const discv4::ChainPeerConfig& chain,
    EthWatchDiscoveryMode          mode,
    bool                           enable_discv4_fallback) noexcept
{
    if (!enable_discv4_fallback ||
        !allows_discv4_discovery(chain) ||
        queue.discovery_bootnodes().empty())
    {
        return false;
    }

    switch (mode)
    {
    case EthWatchDiscoveryMode::kCacheOnly:
        return false;
    case EthWatchDiscoveryMode::kDiscoverIfNeeded:
        return queue.needs_discovery();
    case EthWatchDiscoveryMode::kDiscoverFirst:
    case EthWatchDiscoveryMode::kHybrid:
        return true;
    }

    return false;
}

std::vector<std::string> configured_enr_tree_urls(
    const discv4::ChainPeerConfig& chain) noexcept
{
    return chain.enr_trees;
}

bool should_start_enr_tree_discovery(
    const EthPeerQueue&            queue,
    const discv4::ChainPeerConfig& chain,
    EthWatchDiscoveryMode          mode,
    bool                           enable_enr_tree_discovery) noexcept
{
    if (!enable_enr_tree_discovery ||
        !allows_enr_tree_discovery(chain) ||
        configured_enr_tree_urls(chain).empty())
    {
        return false;
    }

    switch (mode)
    {
    case EthWatchDiscoveryMode::kCacheOnly:
        return false;
    case EthWatchDiscoveryMode::kDiscoverIfNeeded:
        return queue.cached_peer_count() == 0U;
    case EthWatchDiscoveryMode::kDiscoverFirst:
    case EthWatchDiscoveryMode::kHybrid:
        return true;
    }

    return false;
}

bool should_start_cache_enr_discovery(
    const EthPeerQueue&            queue,
    const discv4::ChainPeerConfig& chain,
    EthWatchDiscoveryMode          mode,
    bool                           enable_enr_tree_discovery) noexcept
{
    if (!enable_enr_tree_discovery ||
        !allows_cache_enr_discv5_discovery(chain) ||
        chain.discv5_bootnodes.empty())
    {
        return false;
    }

    switch (mode)
    {
    case EthWatchDiscoveryMode::kCacheOnly:
        return false;
    case EthWatchDiscoveryMode::kDiscoverIfNeeded:
        return queue.cached_peer_count() == 0U;
    case EthWatchDiscoveryMode::kDiscoverFirst:
    case EthWatchDiscoveryMode::kHybrid:
        return true;
    }

    return false;
}

discv4::FilterFn make_optional_fork_id_filter(
    const std::array<uint8_t, 4U>& expected_hash) noexcept
{
    return [expected_hash](const discv4::DiscoveredPeer& peer) noexcept
    {
        if (!peer.eth_fork_id.has_value())
        {
            return true;
        }
        return peer.eth_fork_id->hash == expected_hash;
    };
}

bool direct_enr_tree_peer_allowed(
    const discv4::ChainPeerConfig& chain,
    const discovery::ValidatedPeer& peer) noexcept
{
    if (!chain.fork_id.has_value()
        || chain.discovery_fork_filter == discv4::DiscoveryForkFilter::kDisabled)
    {
        return true;
    }
    if (!peer.eth_fork_id.has_value())
    {
        return false;
    }
    return peer.eth_fork_id->hash == chain.fork_id->fork_hash
        && peer.eth_fork_id->next == chain.fork_id->next_fork;
}

size_t enqueue_resolved_enr_tree_peers(
    const discv4::ChainPeerConfig& chain,
    const std::shared_ptr<EthPeerQueue>& queue,
    const std::vector<std::string>& enrs) noexcept
{
    if (!queue)
    {
        return 0U;
    }

    size_t enqueued = 0U;
    for (const auto& enr : enrs)
    {
        if (enr.rfind("enr:", 0U) != 0U)
        {
            continue;
        }
        const auto record = discv5::EnrParser::parse(enr);
        if (!record)
        {
            continue;
        }
        const auto peer = discv5::EnrParser::to_validated_peer(record.value());
        if (!peer || !direct_enr_tree_peer_allowed(chain, peer.value()))
        {
            continue;
        }
        if (queue->enqueue_validated_discovery_peer(peer.value()))
        {
            ++enqueued;
        }
    }
    return enqueued;
}

} // namespace

EthWatchService::~EthWatchService()
{
    stop();
}

// ---------------------------------------------------------------------------
// make_eth_watcher_pool
// ---------------------------------------------------------------------------

std::shared_ptr<discv4::WatcherPool> make_eth_watcher_pool(
    const EthWatchConnectionConfig& config)
{
    return std::make_shared<discv4::WatcherPool>(
        discv4::WatcherPoolConfig{
            config.max_total_connections,
            config.max_connections_per_chain
        });
}

// ---------------------------------------------------------------------------
// start_eth_watch_chain_peer_dialing
// ---------------------------------------------------------------------------

std::shared_ptr<discv4::DialScheduler> start_eth_watch_chain_peer_dialing(
    boost::asio::io_context&                  io,
    std::shared_ptr<discv4::WatcherPool>      pool,
    discv4::DialFn                            dial_fn,
    const std::vector<discv4::ValidatedPeer>& peers)
{
    auto scheduler = std::make_shared<discv4::DialScheduler>(
        io,
        std::move(pool),
        std::move(dial_fn));

    for (const auto& peer : peers)
    {
        scheduler->enqueue(peer);
    }

    return scheduler;
}

// ---------------------------------------------------------------------------
// initialize / run / stop
// ---------------------------------------------------------------------------

bool EthWatchService::initialize(
    EthWatchServiceConfig          config,
    WatchEventNotificationCallback callback) noexcept
{
    stop();
    orchestration_initialized_ = false;
    orchestration_config_ = {};
    orchestration_callback_ = {};

    if (config.chains.empty())
    {
        return false;
    }

    for (const auto& chain : config.chains)
    {
        if (chain.canonical_name.empty())
        {
            return false;
        }
        if (!chain_config_matches_discovery_mode(
                chain,
                config.discovery_mode,
                config.enable_discv4_fallback,
                config.enable_enr_tree_discovery))
        {
            return false;
        }
    }

    orchestration_config_ = std::move(config);
    orchestration_callback_ = std::move(callback);
    orchestration_initialized_ = true;
    return true;
}

void EthWatchService::run(boost::asio::io_context& io) noexcept
{
    if (!orchestration_initialized_ || orchestration_running_)
    {
        return;
    }

    runtime_chains_.clear();
    active_runners_.clear();
    runtime_stats_by_chain_.clear();
    auto pool = make_eth_watcher_pool(orchestration_config_.connection);

    for (const auto& chain_config : orchestration_config_.chains)
    {
        RuntimeChain runtime{};
        runtime.config = chain_config;
        runtime.stats = std::make_shared<EthWatchRuntimeStatsSnapshot>();
        runtime_stats_by_chain_[runtime.config.canonical_name] = runtime.stats;

        if (orchestration_config_.attach_peer_dialer)
        {
            auto dial_fn = orchestration_config_.dial_fn_factory
                ? orchestration_config_.dial_fn_factory(chain_config)
                : make_default_dial_fn(chain_config);

            runtime.scheduler = std::make_shared<discv4::DialScheduler>(
                io,
                pool,
                std::move(dial_fn));

            if (chain_config.fork_id.has_value()
                && chain_config.discovery_fork_filter == discv4::DiscoveryForkFilter::kRequire)
            {
                runtime.scheduler->filter_fn = make_optional_fork_id_filter(
                    chain_config.fork_id->fork_hash);
            }
        }

        runtime.peer_queue = make_eth_peer_queue(
            runtime.scheduler,
            runtime.config,
            orchestration_config_.peer_queue,
            should_preload_cached_peers(orchestration_config_.discovery_mode));

        if (runtime.peer_queue
            && should_start_enr_tree_discovery(
                *runtime.peer_queue,
                runtime.config,
                orchestration_config_.discovery_mode,
                orchestration_config_.enable_enr_tree_discovery))
        {
            runtime.discv5_enr_tree_started = start_enr_tree_discovery(io, runtime);
        }

        if (runtime.peer_queue
            && !runtime.discv5_enr_tree_started
            && should_start_cache_enr_discovery(
                *runtime.peer_queue,
                runtime.config,
                orchestration_config_.discovery_mode,
                orchestration_config_.enable_enr_tree_discovery))
        {
            runtime.discv5_cache_enr_started =
                start_discv5_discovery(io, runtime, runtime.config.discv5_bootnodes);
        }

        if (runtime.peer_queue
            && !runtime.discv5_enr_tree_started
            && !runtime.discv5_cache_enr_started
            && should_start_discv4_discovery(
                *runtime.peer_queue,
                runtime.config,
                orchestration_config_.discovery_mode,
                orchestration_config_.enable_discv4_fallback))
        {
            start_discv4_fallback(io, runtime);
        }

        runtime_chains_.push_back(std::move(runtime));
    }

    orchestration_running_ = true;
}

void EthWatchService::stop() noexcept
{
    for (auto& runtime : runtime_chains_)
    {
        if (runtime.scheduler)
        {
            runtime.scheduler->stop();
        }
        if (runtime.discovery_client)
        {
            runtime.discovery_client->stop();
        }
        if (runtime.discv5_client)
        {
            runtime.discv5_client->stop();
        }
    }
    active_runners_.clear();
    runtime_chains_.clear();
    runtime_stats_by_chain_.clear();
    orchestration_running_ = false;
}

bool EthWatchService::initialized() const noexcept
{
    return orchestration_initialized_;
}

size_t EthWatchService::runtime_chain_count() const noexcept
{
    return runtime_chains_.size();
}

size_t EthWatchService::scheduler_count() const noexcept
{
    return std::count_if(runtime_chains_.begin(), runtime_chains_.end(),
        [](const RuntimeChain& runtime) { return runtime.scheduler != nullptr; });
}

size_t EthWatchService::peer_queue_count() const noexcept
{
    return std::count_if(runtime_chains_.begin(), runtime_chains_.end(),
        [](const RuntimeChain& runtime) { return runtime.peer_queue != nullptr; });
}

size_t EthWatchService::discovery_client_count() const noexcept
{
    return std::count_if(runtime_chains_.begin(), runtime_chains_.end(),
        [](const RuntimeChain& runtime) { return runtime.discovery_client != nullptr; });
}

size_t EthWatchService::discv4_fallback_count() const noexcept
{
    return std::count_if(runtime_chains_.begin(), runtime_chains_.end(),
        [](const RuntimeChain& runtime) { return runtime.discv4_fallback_started; });
}

size_t EthWatchService::active_runner_count() const noexcept
{
    return active_runners_.size();
}

WatchStatsSnapshot EthWatchService::aggregate_runtime_stats() const noexcept
{
    WatchStatsSnapshot aggregate{};
    for (const auto& runner : active_runners_)
    {
        if (!runner)
        {
            continue;
        }

        const auto stats = runner->service().stats();
        aggregate.eth_messages_seen += stats.eth_messages_seen;
        aggregate.new_block_hashes_messages += stats.new_block_hashes_messages;
        aggregate.new_block_messages += stats.new_block_messages;
        aggregate.receipts_messages += stats.receipts_messages;
        aggregate.decode_failures += stats.decode_failures;
        aggregate.receipts_requested += stats.receipts_requested;
        aggregate.receipts_processed += stats.receipts_processed;
        aggregate.logs_seen += stats.logs_seen;
        aggregate.matched_logs += stats.matched_logs;
        aggregate.discarded_logs += stats.discarded_logs;
    }
    return aggregate;
}

EthWatchRuntimeStatsSnapshot EthWatchService::aggregate_connection_stats() const noexcept
{
    EthWatchRuntimeStatsSnapshot aggregate{};
    for (const auto& runtime : runtime_chains_)
    {
        if (runtime.stats)
        {
            aggregate.tcp_connect_failures += runtime.stats->tcp_connect_failures;
            aggregate.tcp_connected += runtime.stats->tcp_connected;
            aggregate.auth_success += runtime.stats->auth_success;
            aggregate.local_hello_sent += runtime.stats->local_hello_sent;
            aggregate.peer_disconnect_before_hello += runtime.stats->peer_disconnect_before_hello;
            aggregate.peer_hello_accepted += runtime.stats->peer_hello_accepted;
            aggregate.eth_status_sent += runtime.stats->eth_status_sent;
            aggregate.remote_status_accepted += runtime.stats->remote_status_accepted;
            aggregate.remote_status_rejected += runtime.stats->remote_status_rejected;
            for (size_t i = 0; i < aggregate.remote_status_rejected_disconnect_reasons.size(); ++i)
            {
                aggregate.remote_status_rejected_disconnect_reasons[i] +=
                    runtime.stats->remote_status_rejected_disconnect_reasons[i];
            }
            for (size_t i = 0; i < aggregate.remote_status_rejected_validation_errors.size(); ++i)
            {
                aggregate.remote_status_rejected_validation_errors[i] +=
                    runtime.stats->remote_status_rejected_validation_errors[i];
            }
        }

        if (runtime.peer_queue)
        {
            const auto queue_stats = runtime.peer_queue->stats();
            aggregate.peer_queue.cached_peer_count += queue_stats.cached_peer_count;
            aggregate.peer_queue.discovered_peer_count += queue_stats.discovered_peer_count;
            aggregate.peer_queue.requeued_peer_count += queue_stats.requeued_peer_count;
            aggregate.peer_queue.duplicate_peer_drop_count += queue_stats.duplicate_peer_drop_count;
            aggregate.peer_queue.capacity_drop_count += queue_stats.capacity_drop_count;
            aggregate.peer_queue.flaky_peer_drop_count += queue_stats.flaky_peer_drop_count;
            aggregate.peer_queue.too_many_peers_backoff_count += queue_stats.too_many_peers_backoff_count;
            aggregate.peer_queue.backoff_drop_count += queue_stats.backoff_drop_count;
            aggregate.peer_queue.disconnect_feedback_count += queue_stats.disconnect_feedback_count;
            aggregate.peer_queue.disconnected_before_connected_count += queue_stats.disconnected_before_connected_count;
            aggregate.peer_queue.disconnected_after_connected_count += queue_stats.disconnected_after_connected_count;
            aggregate.peer_queue.too_many_peers_before_connected_count += queue_stats.too_many_peers_before_connected_count;
            aggregate.peer_queue.too_many_peers_after_connected_count += queue_stats.too_many_peers_after_connected_count;
            aggregate.peer_queue.tcp_failure_count += queue_stats.tcp_failure_count;
            aggregate.peer_queue.timeout_count += queue_stats.timeout_count;
            aggregate.peer_queue.subprotocol_error_count += queue_stats.subprotocol_error_count;
        }
    }
    return aggregate;
}

std::shared_ptr<EthPeerQueue> EthWatchService::peer_queue(
    const std::string& chain_name) const noexcept
{
    auto it = std::find_if(runtime_chains_.begin(), runtime_chains_.end(),
        [&chain_name](const RuntimeChain& runtime)
        {
            return runtime.config.canonical_name == chain_name;
        });
    if (it == runtime_chains_.end())
    {
        return nullptr;
    }
    return it->peer_queue;
}

discv4::DialFn EthWatchService::make_default_dial_fn(
    const discv4::ChainPeerConfig& chain_config) noexcept
{
    const auto stats_it = runtime_stats_by_chain_.find(chain_config.canonical_name);
    const auto runtime_stats = stats_it != runtime_stats_by_chain_.end()
        ? stats_it->second
        : std::shared_ptr<EthWatchRuntimeStatsSnapshot>{};

    return [this, chain_config, runtime_stats](
        discv4::ValidatedPeer                                      vp,
        std::function<void(rlpx::DisconnectReason)>                on_done,
        std::function<void(std::shared_ptr<rlpx::RlpxSession>)>    on_connected,
        boost::asio::yield_context                                yield)
    {
        static auto log = rlp::base::createLogger("eth_watch");
        auto keypair_result = rlpx::crypto::Ecdh::generate_ephemeral_keypair();
        if (!keypair_result)
        {
            log->warn("Failed to generate local RLPx keypair for chain '{}'",
                      chain_config.canonical_name);
            on_done(rlpx::DisconnectReason::kProtocolError);
            return;
        }

        const auto& keypair = keypair_result.value();
        const rlpx::SessionConnectParams params{
            vp.peer.ip,
            vp.peer.tcp_port,
            keypair.public_key,
            keypair.private_key,
            vp.pubkey,
            "rlp-eth-watch",
            0,
            [runtime_stats](rlpx::ConnectProgressPhase phase, rlpx::DisconnectReason)
            {
                if (!runtime_stats)
                {
                    return;
                }

                switch (phase)
                {
                case rlpx::ConnectProgressPhase::kTcpConnected:
                    ++runtime_stats->tcp_connected;
                    break;
                case rlpx::ConnectProgressPhase::kAuthSucceeded:
                    ++runtime_stats->auth_success;
                    break;
                case rlpx::ConnectProgressPhase::kLocalHelloSent:
                    ++runtime_stats->local_hello_sent;
                    break;
                case rlpx::ConnectProgressPhase::kPeerDisconnectBeforeHello:
                    ++runtime_stats->peer_disconnect_before_hello;
                    break;
                case rlpx::ConnectProgressPhase::kPeerHelloAccepted:
                    ++runtime_stats->peer_hello_accepted;
                    break;
                }
            }
        };

        log->debug("Dialing RLPx peer {}:{} for chain '{}'",
                   vp.peer.ip,
                   vp.peer.tcp_port,
                   chain_config.canonical_name);

        rlpx::DisconnectReason disconnect_reason = rlpx::DisconnectReason::kTcpError;
        auto session_result = rlpx::RlpxSession::connect(params, yield, &disconnect_reason);
        if (!session_result)
        {
            if (runtime_stats && disconnect_reason == rlpx::DisconnectReason::kTcpError)
            {
                ++runtime_stats->tcp_connect_failures;
            }
            log->debug("RLPx dial failed for {}:{} on chain '{}' ({}: {})",
                       vp.peer.ip,
                       vp.peer.tcp_port,
                       chain_config.canonical_name,
                       static_cast<int>(session_result.error()),
                       rlpx::to_string(session_result.error()));
            on_done(disconnect_reason);
            return;
        }

        auto session = std::move(session_result.value());
        auto fork_id = chain_config.fork_id.value_or(ForkId{});
        auto runner = std::make_shared<EthWatchRunner>(
            session,
            chain_config.canonical_name,
            chain_config.network_id,
            chain_config.genesis_hash,
            fork_id,
            chain_config.eth_message_schemas);

        std::optional<rlpx::DisconnectReason> remote_status_disconnect_reason;
        if (runtime_stats)
        {
            ++runtime_stats->eth_status_sent;
        }
        const auto handshake_result = PerformEthStatusHandshake(
            EthStatusHandshakeStart{
                std::make_shared<RlpxEthSessionChannel>(session),
                chain_config.network_id,
                chain_config.genesis_hash,
                fork_id,
                chain_config.eth_message_schemas,
                EthStatusAcceptedHandler{},
                [&remote_status_disconnect_reason](rlpx::DisconnectReason reason)
                {
                    remote_status_disconnect_reason = reason;
                },
                rlpx::EthMessageHandler{}
            },
            yield);
        if (!handshake_result)
        {
            if (runtime_stats)
            {
                ++runtime_stats->remote_status_rejected;
                if (remote_status_disconnect_reason.has_value())
                {
                    ++runtime_stats->remote_status_rejected_disconnect_reasons[
                        static_cast<uint8_t>(*remote_status_disconnect_reason)];
                }
                else
                {
                    const auto error_index = static_cast<size_t>(handshake_result.error());
                    if (error_index < runtime_stats->remote_status_rejected_validation_errors.size())
                    {
                        ++runtime_stats->remote_status_rejected_validation_errors[error_index];
                    }
                }
            }
            log->debug("ETH Status handshake failed for {}:{} on chain '{}' (error {})",
                       vp.peer.ip,
                       vp.peer.tcp_port,
                       chain_config.canonical_name,
                       static_cast<int>(handshake_result.error()));
            (void)session->disconnect(rlpx::DisconnectReason::kSubprotocolError);
            on_done(rlpx::DisconnectReason::kSubprotocolError);
            return;
        }

        log->info("Connected RLPx/ETH peer {}:{} for chain '{}'",
                  vp.peer.ip,
                  vp.peer.tcp_port,
                  chain_config.canonical_name);
        if (runtime_stats)
        {
            ++runtime_stats->remote_status_accepted;
        }

        on_connected(session);
        runner->set_event_callback(orchestration_callback_);
        for (const auto& watch : orchestration_config_.watches)
        {
            (void)runner->watch_event(
                watch.contract_address,
                watch.event_signature,
                watch.params,
                watch.from_block,
                watch.to_block);
        }
        runner->install_session_bridge();
        active_runners_.push_back(std::move(runner));
    };
}

bool EthWatchService::start_enr_tree_discovery(
    boost::asio::io_context& io,
    RuntimeChain&            runtime) noexcept
{
    const auto urls = configured_enr_tree_urls(runtime.config);
    if (urls.empty())
    {
        return false;
    }

    std::vector<std::string> bootstrap_enrs;
    if (orchestration_config_.enr_tree_resolver)
    {
        bootstrap_enrs = orchestration_config_.enr_tree_resolver(runtime.config, urls);
    }
    else
    {
        bootstrap_enrs = discv5::EnrTreeResolver{}.resolve(urls);
    }

    if (bootstrap_enrs.empty())
    {
        return false;
    }

    const auto direct_enqueued = enqueue_resolved_enr_tree_peers(
        runtime.config,
        runtime.peer_queue,
        bootstrap_enrs);
    if (direct_enqueued > 0U)
    {
        static auto log = rlp::base::createLogger("eth_watch");
        log->info("Enqueued {} direct ENR-tree peer(s) for chain '{}'",
                  direct_enqueued,
                  runtime.config.canonical_name);
    }

    return start_discv5_discovery(io, runtime, bootstrap_enrs);
}

bool EthWatchService::start_discv5_discovery(
    boost::asio::io_context&        io,
    RuntimeChain&                   runtime,
    const std::vector<std::string>& bootstrap_enrs) noexcept
{
    if (bootstrap_enrs.empty())
    {
        return false;
    }

    if (orchestration_config_.discv5_enr_tree_starter)
    {
        return orchestration_config_.discv5_enr_tree_starter(
            io,
            runtime.config,
            runtime.peer_queue,
            bootstrap_enrs);
    }

    discv5::discv5Config discovery_config = orchestration_config_.discv5_discovery;
    discovery_config.bootstrap_enrs = bootstrap_enrs;
    if (runtime.config.fork_id.has_value()
        && runtime.config.discovery_fork_filter == discv4::DiscoveryForkFilter::kRequire)
    {
        discv5::ForkId fork_id{};
        fork_id.hash = runtime.config.fork_id->fork_hash;
        fork_id.next = runtime.config.fork_id->next_fork;
        discovery_config.required_fork_id = fork_id;
    }

    auto keypair_result = rlpx::crypto::Ecdh::generate_ephemeral_keypair();
    if (keypair_result)
    {
        discovery_config.private_key = keypair_result.value().private_key;
        discovery_config.public_key = keypair_result.value().public_key;
    }

    try
    {
        runtime.discv5_client = orchestration_config_.discv5_client_factory
            ? orchestration_config_.discv5_client_factory(io, discovery_config)
            : std::make_shared<discv5::discv5_client>(io, discovery_config);
    }
    catch (...)
    {
        runtime.discv5_client.reset();
        return false;
    }

    if (!runtime.discv5_client)
    {
        return false;
    }

    auto queue = runtime.peer_queue;
    runtime.discv5_client->set_peer_discovered_callback(
        [queue](const discovery::ValidatedPeer& peer)
        {
            if (queue)
            {
                (void)queue->enqueue_validated_discovery_peer(peer);
            }
        });

    const auto start_result = runtime.discv5_client->start();
    if (!start_result)
    {
        runtime.discv5_client.reset();
        return false;
    }

    return true;
}

void EthWatchService::start_discv4_fallback(
    boost::asio::io_context& io,
    RuntimeChain&            runtime) noexcept
{
    if (orchestration_config_.discv4_fallback_starter)
    {
        runtime.discv4_fallback_started =
            orchestration_config_.discv4_fallback_starter(
                io,
                runtime.config,
                runtime.peer_queue);
        return;
    }

    discv4::discv4Config discovery_config = orchestration_config_.discovery;
    auto keypair_result = rlpx::crypto::Ecdh::generate_ephemeral_keypair();
    if (keypair_result)
    {
        discovery_config.private_key = keypair_result.value().private_key;
        discovery_config.public_key = keypair_result.value().public_key;
    }

    try
    {
        runtime.discovery_client = orchestration_config_.discovery_client_factory
            ? orchestration_config_.discovery_client_factory(io, discovery_config)
            : std::make_shared<discv4::discv4_client>(io, discovery_config);
    }
    catch (...)
    {
        runtime.discovery_client.reset();
        return;
    }

    if (!runtime.discovery_client)
    {
        return;
    }

    auto queue = runtime.peer_queue;
    runtime.discovery_client->set_peer_discovered_callback(
        [queue](const discv4::DiscoveredPeer& peer)
        {
            if (queue)
            {
                (void)queue->enqueue_discovered_peer(peer);
            }
        });

    const auto start_result = runtime.discovery_client->start();
    if (!start_result)
    {
        runtime.discovery_client.reset();
        return;
    }
    runtime.discv4_fallback_started = true;

    for (const auto& bootnode : runtime.peer_queue->discovery_bootnodes())
    {
        const std::string host = bootnode.peer.ip;
        const uint16_t port = bootnode.peer.udp_port;
        const discv4::NodeId node_id = bootnode.peer.node_id;
        auto client = runtime.discovery_client;
        boost::asio::spawn(io,
            [client, host, port, node_id](boost::asio::yield_context yield)
            {
                (void)client->find_node(host, port, node_id, yield);
            });
    }
}

// ---------------------------------------------------------------------------
// set_send_callback
// ---------------------------------------------------------------------------

void EthWatchService::set_send_callback(SendCallback cb) noexcept
{
    send_cb_ = std::move(cb);
}

void EthWatchService::set_eth_message_schemas(std::vector<EthMessageSchema> schemas) noexcept
{
    eth_message_schemas_ = std::move(schemas);
}

// ---------------------------------------------------------------------------
// watch_event
// ---------------------------------------------------------------------------

EventWatchId EthWatchService::watch_event(
    const codec::Address&             contract_address,
    const std::string&                event_signature,
    const std::vector<abi::AbiParam>& params,
    DecodedEventCallback              callback,
    std::optional<uint64_t>           from_block,
    std::optional<uint64_t>           to_block) noexcept
{
    const EventWatchId id = next_id_++;

    EventFilter filter;

    const codec::Address zero_addr{};
    if (contract_address != zero_addr)
    {
        filter.addresses.push_back(contract_address);
    }

    filter.topics.push_back(abi::event_signature_hash(event_signature));
    filter.from_block = from_block;
    filter.to_block   = to_block;

    subscriptions_.push_back({id, event_signature, params, std::move(callback)});

    watcher_.watch(filter, [this, id](const MatchedEvent& ev)
    {
        auto it = std::find_if(subscriptions_.begin(), subscriptions_.end(),
            [id](const Subscription& s) { return s.id == id; });
        if (it == subscriptions_.end())
        {
            return;
        }
        auto decoded = abi::decode_log(ev.log, it->event_signature, it->params);
        if (!decoded)
        {
            return;
        }
        it->callback(ev, decoded.value());
    });

    return id;
}

// ---------------------------------------------------------------------------
// unwatch
// ---------------------------------------------------------------------------

void EthWatchService::unwatch(EventWatchId id) noexcept
{
    subscriptions_.erase(
        std::remove_if(subscriptions_.begin(), subscriptions_.end(),
            [id](const Subscription& s) { return s.id == id; }),
        subscriptions_.end());

    watcher_.unwatch(id);
}

// ---------------------------------------------------------------------------
// request_receipts
// ---------------------------------------------------------------------------

void EthWatchService::request_receipts(const Hash256& block_hash,
                                       uint64_t       block_number) noexcept
{
    if (!send_cb_)
    {
        return;
    }

    // Deduplicate — skip if we have already requested receipts for this block
    if (!chain_tracker_.mark_seen(block_hash, block_number))
    {
        return;
    }

    const uint64_t req_id = next_req_id_++;

    GetReceiptsMessage req;
    req.request_id = req_id;
    req.block_hashes.push_back(block_hash);

    auto encoded = protocol::encode_get_receipts(req);
    if (!encoded)
    {
        return;
    }

    pending_requests_[req_id] = {block_hash, block_number};
    ++stats_.receipts_requested;
    send_cb_(protocol::kGetReceiptsMessageId, std::move(encoded.value()));
}

// ---------------------------------------------------------------------------
// process_message
// ---------------------------------------------------------------------------

void EthWatchService::process_message(uint8_t eth_msg_id, rlp::ByteView payload) noexcept
{
    ++stats_.eth_messages_seen;

    if (eth_msg_id == protocol::kNewBlockHashesMessageId)
    {
        ++stats_.new_block_hashes_messages;
        auto decoded = protocol::decode_new_block_hashes(payload);
        if (!decoded)
        {
            ++stats_.decode_failures;
            return;
        }
        for (const auto& entry : decoded.value().entries)
        {
            request_receipts(entry.hash, entry.number);
        }
        return;
    }

    if (eth_msg_id == protocol::kNewBlockMessageId)
    {
        ++stats_.new_block_messages;
        auto decoded = eth_message_schemas_.empty()
            ? protocol::decode_new_block(payload)
            : protocol::decode_new_block(payload, eth_message_schemas_);
        if (!decoded)
        {
            ++stats_.decode_failures;
            return;
        }
        // NewBlock does not include a block hash on the wire — use zeroed sentinel.
        // Still trigger request_receipts so callers with a send_cb get receipts.
        const Hash256 block_hash{};
        process_new_block(decoded.value(), block_hash);
        return;
    }

    if (eth_msg_id == protocol::kReceiptsMessageId)
    {
        ++stats_.receipts_messages;
        auto decoded = eth_message_schemas_.empty()
            ? protocol::decode_receipts(payload)
            : protocol::decode_receipts(payload, eth_message_schemas_);
        if (!decoded)
        {
            ++stats_.decode_failures;
            return;
        }

        const auto& msg = decoded.value();
        size_t block_idx = 0;
        for (const auto& block_receipts : msg.receipts)
        {
            uint64_t block_number = 0;
            Hash256  block_hash{};

            // Correlate to a pending request if request_id is present
            if (msg.request_id.has_value())
            {
                // Each block in the response corresponds to one hash in the request.
                // We issued one hash per request, so request_id maps 1:1.
                if (block_idx == 0)
                {
                    auto it = pending_requests_.find(msg.request_id.value());
                    if (it != pending_requests_.end())
                    {
                        block_hash   = it->second.block_hash;
                        block_number = it->second.block_number;
                        pending_requests_.erase(it);
                    }
                }
            }

            std::vector<Hash256> tx_hashes(block_receipts.size());
            process_receipts(block_receipts, tx_hashes, block_number, block_hash);
            ++block_idx;
        }
        return;
    }
}

// ---------------------------------------------------------------------------
// process_receipts
// ---------------------------------------------------------------------------

void EthWatchService::process_receipts(
    const std::vector<codec::Receipt>& receipts,
    const std::vector<Hash256>&        tx_hashes,
    uint64_t                           block_number,
    const Hash256&                     block_hash,
    const std::vector<std::vector<uint32_t>>& log_indices) noexcept
{
    const size_t count = std::min(receipts.size(), tx_hashes.size());
    stats_.receipts_processed += static_cast<uint64_t>(count);
    uint32_t next_log_index = 0;
    for (size_t i = 0; i < count; ++i)
    {
        const size_t log_count = receipts[i].logs.size();
        uint32_t first_log_index = next_log_index;
        if (i < log_indices.size() && !log_indices[i].empty())
        {
            first_log_index = log_indices[i].front();
        }
        const size_t matched = watcher_.process_receipt(
            receipts[i],
            tx_hashes[i],
            block_number,
            block_hash,
            first_log_index);
        stats_.logs_seen += static_cast<uint64_t>(log_count);
        stats_.matched_logs += static_cast<uint64_t>(matched);
        stats_.discarded_logs += static_cast<uint64_t>(log_count - matched);
        next_log_index += static_cast<uint32_t>(log_count);
    }
}

// ---------------------------------------------------------------------------
// process_new_block
// ---------------------------------------------------------------------------

void EthWatchService::process_new_block(const NewBlockMessage& msg,
                                        const Hash256&         block_hash) noexcept
{
    // Request receipts for each transaction in the block so logs can be watched.
    // The block number comes from the embedded header.
    request_receipts(block_hash, msg.header.number);
}

} // namespace eth

Updated on 2026-06-05 at 17:22:19 -0700