Skip to content

eth_watch/eth_watch.cpp

Functions

Name
int main(int argc, char ** argv)

Functions Documentation

function main

int main(
    int argc,
    char ** argv
)

Source code

// Copyright 2026 Genius Ventures, Inc.
// SPDX-License-Identifier: MIT
//
// eth_watch — example CLI for the EthWatchService.
//
// The --direct-enode flag is an example-local convenience for single-peer
// diagnostic connections. It is NOT a production API. Production deployment
// should use the standard discovery pipeline (EthPeerQueue + Discv5) or the
// ChainList-based RPC path (evmrelay RpcManager).
//
// Do NOT promote --direct-enode to include/eth/ without first adding:
//  - timeout / retry / backoff
//  - structured error diagnostics
//  - unit tests covering peer lifecycle

#include <atomic>
#include <functional>
#include <boost/asio/spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/redirect_error.hpp>
#include <chrono>
#include <iostream>
#include <limits>
#include <memory>
#include <optional>
#include <set>
#include <sstream>
#include <string>
#include <string_view>
#include <unordered_map>
#include <vector>

#include <csignal>
#include <cstdlib>

#include <eth/messages.hpp>
#include <eth/eth_peer_session.hpp>
#include <eth/eth_watch_runner.hpp>
#include <eth/eth_watch_service.hpp>
#include <eth/eth_watch_cli.hpp>
#include "../chain_config.hpp"
#include <discv4/chain_peers.hpp>
#include <discv4/dial_scheduler.hpp>
#include <rlpx/crypto/ecdh.hpp>
#include <rlpx/rlpx_error.hpp>
#include <rlpx/rlpx_session.hpp>
#include <base/parse_utility.hpp>
#include <base/rlp-logger.hpp>
#include <eth/eth_handshake_guard.hpp>
#include <eth/eth_handshake.hpp>

namespace {

template <size_t N>
std::string format_nonzero_counts(const std::array<uint64_t, N>& counts)
{
    std::ostringstream out;
    bool first = true;
    for (size_t i = 0; i < counts.size(); ++i)
    {
        if (counts[i] == 0U)
        {
            continue;
        }
        if (!first)
        {
            out << ",";
        }
        out << i << "=" << counts[i];
        first = false;
    }
    return first ? "none" : out.str();
}

inline constexpr const char* kDefaultChainPeersUrl = "https://enodes.gnus.ai/chain_enodes.json.gz";
inline constexpr auto kWatchStatsInterval = std::chrono::seconds(4);
inline constexpr uint64_t kDefaultDetailedEventLimit = 2;

struct Config {
    std::string host;
    uint16_t port = 0;
    std::string peer_pubkey_hex;
    std::string canonical_chain_name;
    std::vector<eth::cli::WatchSpec> watch_specs;
    bool prefer_direct_enode = false;
    bool fork_id_overridden = false;
    bool use_chain_peer_cache = false;
    // ETH Status fields — must match the target chain
    uint64_t network_id = 1;
    eth::Hash256 genesis_hash{};
    eth::ForkId  fork_id{};   
    std::vector<discv4::ValidatedPeer> chain_peers;
    discv4::ChainPeerConfig chain_peer_config{};
};

struct WatchOutputState
{
    uint64_t total_events = 0;
    uint64_t detailed_event_limit = kDefaultDetailedEventLimit;
    uint64_t run_seconds = 0;
    std::unordered_map<std::string, uint64_t> events_by_chain;
    std::vector<std::string> service_chain_names;
};

std::vector<std::string> parse_chain_name_list(std::string_view value)
{
    std::vector<std::string> names;
    std::set<std::string> seen;
    size_t start = 0;
    while (start <= value.size())
    {
        const auto comma = value.find(',', start);
        const auto end = comma == std::string_view::npos ? value.size() : comma;
        const auto part = value.substr(start, end - start);
        if (!part.empty())
        {
            std::string name(part);
            if (seen.insert(name).second)
            {
                names.push_back(std::move(name));
            }
        }
        if (comma == std::string_view::npos)
        {
            break;
        }
        start = comma + 1;
    }
    return names;
}

std::optional<eth::Hash256> parse_hash256(std::string_view value)
{
    eth::Hash256 hash{};
    if (!rlp::base::parse::hex_array(value, hash))
    {
        return std::nullopt;
    }
    return hash;
}

std::optional<eth::EthWatchDiscoveryMode> parse_peer_selection_mode(std::string_view value) noexcept
{
    if (value == "cache-only")
    {
        return eth::EthWatchDiscoveryMode::kCacheOnly;
    }
    if (value == "discover-if-needed")
    {
        return eth::EthWatchDiscoveryMode::kDiscoverIfNeeded;
    }
    if (value == "discover-first")
    {
        return eth::EthWatchDiscoveryMode::kDiscoverFirst;
    }
    if (value == "hybrid")
    {
        return eth::EthWatchDiscoveryMode::kHybrid;
    }
    return std::nullopt;
}

std::optional<Config> parse_enode(std::string_view enode) {
    constexpr std::string_view kPrefix = "enode://";
    if (enode.size() < kPrefix.size() || enode.substr(0, kPrefix.size()) != kPrefix) {
        return std::nullopt;
    }

    const auto without_prefix = enode.substr(kPrefix.size());
    const auto at_pos = without_prefix.find('@');
    if (at_pos == std::string_view::npos) {
        return std::nullopt;
    }

    const auto pubkey_hex = without_prefix.substr(0, at_pos);
    if (pubkey_hex.size() != rlpx::kPublicKeySize * 2) {
        return std::nullopt;
    }

    const auto address_part = without_prefix.substr(at_pos + 1);
    const auto query_pos = address_part.find('?');
    const auto host_port = address_part.substr(0, query_pos);
    const auto colon_pos = host_port.rfind(':');
    if (colon_pos == std::string_view::npos) {
        return std::nullopt;
    }

    const auto host_view = host_port.substr(0, colon_pos);
    const auto port_view = host_port.substr(colon_pos + 1);
    auto port_value = rlp::base::parse::uint16_decimal(port_view);
    if (!port_value) {
        return std::nullopt;
    }

    Config cfg;
    cfg.host = std::string(host_view);
    cfg.port = *port_value;
    cfg.peer_pubkey_hex = std::string(pubkey_hex);
    return cfg;
}

void apply_chain_peer_config(
    Config&                              config,
    const discv4::ChainPeerConfig& chain_peer_config) noexcept
{
    config.canonical_chain_name = chain_peer_config.canonical_name;
    config.network_id = chain_peer_config.network_id;
    config.genesis_hash = chain_peer_config.genesis_hash;
    config.chain_peers = chain_peer_config.nodes;
    config.chain_peer_config = chain_peer_config;
    config.use_chain_peer_cache = !chain_peer_config.nodes.empty() || !chain_peer_config.bootnodes.empty();
    if (!config.fork_id_overridden && chain_peer_config.fork_id.has_value())
    {
        config.fork_id = *chain_peer_config.fork_id;
    }
}

void log_watch_notification(
    const eth::WatchEventNotification& notification,
    const std::shared_ptr<WatchOutputState>& output_state)
{
    static auto ev_log = rlp::base::createLogger("eth_watch");
    auto bytes_to_hex = [](const auto& arr)
    {
        std::string s;
        s.reserve(arr.size() * 2);
        for (const auto b : arr)
        {
            const char hex[] = "0123456789abcdef";
            s += hex[(static_cast<uint8_t>(b) >> 4) & 0xf];
            s += hex[ static_cast<uint8_t>(b)       & 0xf];
        }
        return s;
    };

    ++output_state->total_events;
    const auto chain_count = ++output_state->events_by_chain[notification.context.chain_name];

    std::string header = "event_count=" + std::to_string(output_state->total_events) +
                         " chain_count=" + std::to_string(chain_count) +
                         " " + notification.event_signature + " at block " +
                         std::to_string(notification.event.block_number) +
                         " chain=" + notification.context.chain_name;
    if (notification.event.tx_hash != eth::codec::Hash256{})
    {
        header += "  tx: 0x" + bytes_to_hex(notification.event.tx_hash);
    }
    SPDLOG_LOGGER_INFO(ev_log, "{}", header);

    if (output_state->total_events > output_state->detailed_event_limit)
    {
        return;
    }

    for (size_t i = 0; i < notification.values.size(); ++i)
    {
        const std::string label = std::to_string(i);
        std::string value;
        if (const auto* addr = std::get_if<eth::codec::Address>(&notification.values[i]))
        {
            value = "0x" + bytes_to_hex(*addr);
        }
        else if (const auto* u256 = std::get_if<intx::uint256>(&notification.values[i]))
        {
            value = intx::to_string(*u256);
        }
        else if (const auto* b32 = std::get_if<eth::codec::Hash256>(&notification.values[i]))
        {
            value = "0x" + bytes_to_hex(*b32);
        }
        else if (const auto* bval = std::get_if<bool>(&notification.values[i]))
        {
            value = (*bval ? "true" : "false");
        }
        SPDLOG_LOGGER_INFO(ev_log, "  [{}] {}", label, value);
    }
}

void schedule_service_stats(
    boost::asio::io_context& io,
    eth::EthWatchService& service,
    std::shared_ptr<boost::asio::steady_timer> timer)
{
    timer->expires_after(kWatchStatsInterval);
    timer->async_wait([&io, &service, timer](const boost::system::error_code& ec)
    {
        if (ec)
        {
            return;
        }

        static auto log = rlp::base::createLogger("eth_watch");
        const auto stats = service.aggregate_runtime_stats();
        const auto connection_stats = service.aggregate_connection_stats();
        SPDLOG_LOGGER_INFO(log,
                           "Watch stats [service]: active_sessions={} chains={} queues={} discv4_clients={} "
                           "eth_messages={} new_block_hashes={} new_blocks={} receipts_messages={} "
                           "decode_failures={} receipts_requested={} receipts_processed={} logs_seen={} "
                           "matched_logs={} discarded_logs={} transport_connect_failures={} auth_success={} "
                           "local_hello_sent={} peer_disconnect_before_hello={} peer_hello_accepted={} "
                           "eth_status_sent={} remote_status_accepted={} remote_status_rejected={} "
                           "remote_status_reject_disconnect_reasons={} remote_status_reject_validation_errors={} "
                           "too_many_peers_before_peer_hello={} peer_disconnect_after_eth_status_accept={}",
                           service.active_runner_count(),
                           service.runtime_chain_count(),
                           service.peer_queue_count(),
                           service.discovery_client_count(),
                           stats.eth_messages_seen,
                           stats.new_block_hashes_messages,
                           stats.new_block_messages,
                           stats.receipts_messages,
                           stats.decode_failures,
                           stats.receipts_requested,
                           stats.receipts_processed,
                           stats.logs_seen,
                           stats.matched_logs,
                           stats.discarded_logs,
                           connection_stats.tcp_connect_failures,
                           connection_stats.auth_success,
                           connection_stats.local_hello_sent,
                           connection_stats.peer_disconnect_before_hello,
                           connection_stats.peer_hello_accepted,
                           connection_stats.eth_status_sent,
                           connection_stats.remote_status_accepted,
                           connection_stats.remote_status_rejected,
                           format_nonzero_counts(connection_stats.remote_status_rejected_disconnect_reasons),
                           format_nonzero_counts(connection_stats.remote_status_rejected_validation_errors),
                           connection_stats.peer_queue.too_many_peers_before_connected_count,
                           connection_stats.peer_queue.disconnected_after_connected_count);

        schedule_service_stats(io, service, timer);
    });
}

void log_service_summary(
    eth::EthWatchService& service,
    const std::shared_ptr<WatchOutputState>& output_state)
{
    if (!service.initialized())
    {
        return;
    }

    static auto log = rlp::base::createLogger("eth_watch");
    const auto traffic = service.aggregate_runtime_stats();
    const auto connection = service.aggregate_connection_stats();
    SPDLOG_LOGGER_INFO(log,
                       "Final eth_watch summary: active_sessions={} chains={} cached_peers={} discovered_peers={} "
                       "transport_connect_failures={} auth_success={} local_hello_sent={} "
                       "peer_disconnect_before_hello={} peer_hello_accepted={} eth_status_sent={} "
                       "remote_status_accepted={} remote_status_rejected={} "
                       "remote_status_reject_disconnect_reasons={} remote_status_reject_validation_errors={} "
                       "peer_disconnect_after_eth_status_accept={} "
                       "eth_messages={} matched_logs={} logs_seen={} decode_failures={}",
                       service.active_runner_count(),
                       service.runtime_chain_count(),
                       connection.peer_queue.cached_peer_count,
                       connection.peer_queue.discovered_peer_count,
                       connection.tcp_connect_failures,
                       connection.auth_success,
                       connection.local_hello_sent,
                       connection.peer_disconnect_before_hello,
                       connection.peer_hello_accepted,
                       connection.eth_status_sent,
                       connection.remote_status_accepted,
                       connection.remote_status_rejected,
                       format_nonzero_counts(connection.remote_status_rejected_disconnect_reasons),
                       format_nonzero_counts(connection.remote_status_rejected_validation_errors),
                       connection.peer_queue.disconnected_after_connected_count,
                       traffic.eth_messages_seen,
                       traffic.matched_logs,
                       traffic.logs_seen,
                       traffic.decode_failures);
    for (const auto& chain_name : output_state->service_chain_names)
    {
        auto queue = service.peer_queue(chain_name);
        if (!queue)
        {
            continue;
        }
        const auto queue_stats = queue->stats();
        SPDLOG_LOGGER_INFO(log,
                           "Final chain discovery summary: chain={} cached_peers={} discovered_peers={} "
                           "disconnect_feedback={} before_eth_status_accept={} "
                           "after_eth_status_accept={} transport_connect_failures={} timeouts={} "
                           "subprotocol_errors={} backoff_drops={}",
                           chain_name,
                           queue_stats.cached_peer_count,
                           queue_stats.discovered_peer_count,
                           queue_stats.disconnect_feedback_count,
                           queue_stats.disconnected_before_connected_count,
                           queue_stats.disconnected_after_connected_count,
                           queue_stats.tcp_failure_count,
                           queue_stats.timeout_count,
                           queue_stats.subprotocol_error_count,
                           queue_stats.backoff_drop_count);
    }
    SPDLOG_LOGGER_INFO(log,
                       "Final disconnect summary: feedback={} before_eth_status_accept={} after_eth_status_accept={} "
                       "peer_disconnect_before_hello={} too_many_peers_before_peer_hello={} "
                       "too_many_peers_after_eth_status_accept={} transport_connect_failures={} "
                       "timeouts={} subprotocol_errors={} backoff_drops={} requeued={} flaky_drops={}",
                       connection.peer_queue.disconnect_feedback_count,
                       connection.peer_queue.disconnected_before_connected_count,
                       connection.peer_queue.disconnected_after_connected_count,
                       connection.peer_disconnect_before_hello,
                       connection.peer_queue.too_many_peers_before_connected_count,
                       connection.peer_queue.too_many_peers_after_connected_count,
                       connection.peer_queue.tcp_failure_count,
                       connection.peer_queue.timeout_count,
                       connection.peer_queue.subprotocol_error_count,
                       connection.peer_queue.backoff_drop_count,
                       connection.peer_queue.requeued_peer_count,
                       connection.peer_queue.flaky_peer_drop_count);
}

void print_usage(const char* exe)
{
    std::cout << "Usage:\n"
              << "  " << exe << " <host> <port> <peer_pubkey_hex>\n"
              << "  " << exe << " --chain <chain_name>\n"
              << "  " << exe << " --chains <chain1,chain2,...>\n"
              << "  " << exe << " --all-chains\n"
              << "  " << exe << " --chain <chain_name> --chain-peers-json <path>\n"
              << "  " << exe << " --chain <chain_name> --chain-peers-url <url>\n"
              << "  " << exe << " --chain <chain_name> --direct-enode <enode://...>\n"
              << "\nDirect mode overrides:\n"
              << "  --network-id <uint64>            Override ETH Status network id\n"
              << "  --genesis-hash <0x64hex>         Override ETH Status genesis hash\n"
              << "  --fork-id-hash <0x8hex>          Override ETH Status fork id hash\n"
              << "  --fork-id-next <uint64>          Override ETH Status fork id next fork\n"
              << "\nOptional watch flags (repeatable, must follow connection args):\n"
              << "  --watch-contract <0x20byteHex>   Contract address to filter (omit for any)\n"
              << "  --watch-event    <signature>      Event signature, e.g. Transfer(address,address,uint256)\n"
              << "  Each --watch-event pairs with the preceding --watch-contract (or any contract if none).\n"
              << "  --display-events <count>          Print full decoded details for only the first count matches (default 2).\n"
              << "  --max-peers-per-chain <count>     Active dial/watch slots per chain (default 3).\n"
              << "  --max-peers-total <count>         Active dial/watch slots across all chains (default 24).\n"
              << "  --cache-peer-start-offset <count> Rotate cached peers by count before spreading across dial slots.\n"
              << "  --max-pending-peers <count>       Max queued peer candidates per chain while discovery outpaces dialing.\n"
              << "  --discv5-port <udp-port>          UDP bind port for discv5 discovery (default 9000).\n"
              << "  --peer-selection <mode>           cache-only, discover-if-needed, discover-first, or hybrid.\n"
              << "  --discover-only                   Run discovery and peer queueing without RLPx/ETH dialing.\n"
              << "  --run-seconds <count>             Stop automatically after count seconds (default: run until signal).\n"
              << "\nExamples:\n"
              << "  " << exe << " --chain ethereum-sepolia --watch-event Transfer(address,address,uint256)\n"
              << "  " << exe << " --chains ethereum-mainnet,ethereum-sepolia --watch-event Transfer(address,address,uint256)\n"
              << "  " << exe << " --all-chains --watch-event Transfer(address,address,uint256)\n"
              << "  " << exe << " --chain ethereum-sepolia --direct-enode enode://<pubkey>@<host>:<port> --watch-event Transfer(address,address,uint256)\n"
              << "  " << exe << " 127.0.0.1 30303 <pubkey> --network-id 1337 --genesis-hash 0xfa742c20043b1d8a13ea6421d85e9678429f9f50c2e25b2814c61f7444504fec --log-level debug\n"
              << "  " << exe << " --chain ethereum-mainnet --watch-contract 0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48 --watch-event Transfer(address,address,uint256)\n"
              << "  " << exe << " --all-chains --peer-selection discover-first --discover-only --run-seconds 10\n"
              << "\nAvailable chains:\n"
              << "  ethereum-mainnet, ethereum-sepolia, ethereum-holesky\n"
              << "  polygon-mainnet, polygon-amoy\n"
              << "  bnb-smart-chain, bnb-smart-chain-testnet\n"
              << "  base-mainnet, base-sepolia\n";
}

void run_watch(std::string host,
               uint16_t port,
               rlpx::PublicKey peer_pubkey,
               std::string chain_name,
               uint64_t network_id,
               eth::Hash256 genesis_hash,
               eth::ForkId fork_id,
               std::vector<eth::EthMessageSchema> eth_message_schemas,
               const std::vector<eth::cli::WatchSpec>& watch_specs,
               std::shared_ptr<WatchOutputState> output_state,
               const std::function<void(rlpx::DisconnectReason)>& on_done,
               const std::function<void(std::shared_ptr<rlpx::RlpxSession>)>& on_connected,
               boost::asio::yield_context yield)
{
    static auto log = rlp::base::createLogger("eth_watch");

    SPDLOG_LOGGER_DEBUG(log, "run_watch: begin chain={} host={} port={} network_id={}",
                        chain_name, host, port, network_id);

    auto keypair_result = rlpx::crypto::Ecdh::generate_ephemeral_keypair();
    if (!keypair_result)
    {
        SPDLOG_LOGGER_ERROR(log, "run_watch: failed to generate local keypair");
        on_done(rlpx::DisconnectReason::kProtocolError);
        return;
    }

    const auto& keypair = keypair_result.value();

    const rlpx::SessionConnectParams params{
        host,
        port,
        keypair.public_key,
        keypair.private_key,
        peer_pubkey,
        "rlp-eth-watch",
        0
    };

    SPDLOG_LOGGER_DEBUG(log, "run_watch: connecting to {}:{}", host, port);
    rlpx::DisconnectReason disconnect_reason = rlpx::DisconnectReason::kTcpError;
    auto session_result = rlpx::RlpxSession::connect(params, yield, &disconnect_reason);
    if (!session_result)
    {
        const auto err = session_result.error();
        SPDLOG_LOGGER_DEBUG(log, "run_watch: failed to connect to {}:{} (error {}: {})",
                            host, port, static_cast<int>(err), rlpx::to_string(err));
        on_done(disconnect_reason);
        return;
    }

    SPDLOG_LOGGER_DEBUG(log, "run_watch: connect returned success");

    auto session = std::move(session_result.value());
    auto watch_runner = std::make_shared<eth::EthWatchRunner>(
        session,
        chain_name,
        network_id,
        genesis_hash,
        fork_id);
    auto executor = yield.get_executor();
    auto status_received = std::make_shared<std::atomic<bool>>(false);
    auto status_timeout = std::make_shared<boost::asio::steady_timer>(executor);
    auto stats_timer = std::make_shared<boost::asio::steady_timer>(executor);
    status_timeout->expires_after(eth::protocol::kStatusHandshakeTimeout);

    SPDLOG_LOGGER_DEBUG(log, "run_watch: watch runner created");
    SPDLOG_LOGGER_DEBUG(log, "run_watch: HELLO from peer: {}", session->peer_info().client_id);
    const uint8_t negotiated_eth_version = session->negotiated_eth_version();
    const uint8_t negotiated_eth_offset = session->negotiated_eth_offset();
    SPDLOG_LOGGER_DEBUG(log, "run_watch: negotiated eth version={} offset=0x{:02x}",
                        static_cast<int>(negotiated_eth_version),
                        negotiated_eth_offset);

    SPDLOG_LOGGER_DEBUG(log, "run_watch: sending local ETH Status");
    const auto handshake_result = eth::PerformEthStatusHandshake(
        eth::EthStatusHandshakeStart{
            std::make_shared<eth::RlpxEthSessionChannel>(session),
            network_id,
            genesis_hash,
            fork_id,
            eth_message_schemas,
            eth::EthStatusAcceptedHandler{},
            eth::EthStatusRemoteDisconnectHandler{},
            rlpx::EthMessageHandler{}
        },
        yield);
    if (!handshake_result)
    {
        using E = eth::StatusValidationError;
        switch (handshake_result.error())
        {
        case E::kProtocolVersionMismatch:
            SPDLOG_LOGGER_ERROR(log, "run_watch: ETH Status handshake failed: protocol version mismatch");
            break;
        case E::kNetworkIDMismatch:
            SPDLOG_LOGGER_ERROR(log, "run_watch: ETH Status handshake failed: network id mismatch");
            break;
        case E::kGenesisMismatch:
            SPDLOG_LOGGER_ERROR(log, "run_watch: ETH Status handshake failed: genesis mismatch");
            break;
        case E::kInvalidBlockRange:
            SPDLOG_LOGGER_ERROR(log, "run_watch: ETH Status handshake failed: invalid block range");
            break;
        }
        (void)session->disconnect(rlpx::DisconnectReason::kSubprotocolError);
        on_done(rlpx::DisconnectReason::kSubprotocolError);
        return;
    }

    const auto common = eth::get_common_fields(handshake_result.value().remote_status);
    const uint64_t latest_block = eth::ExtractLatestBlockNumber(handshake_result.value().remote_status);
    status_received->store(true);
    status_timeout->cancel();
    on_connected(session);
    SPDLOG_LOGGER_INFO(log, "ETH Status: network_id={} protocol={} latest_block={}",
                       common.network_id,
                       static_cast<int>(common.protocol_version),
                       latest_block);
    SPDLOG_LOGGER_INFO(log, "Connected. Watching for events...");
    SPDLOG_LOGGER_DEBUG(log, "run_watch: local ETH Status queued");

    watch_runner->set_event_callback([output_state](const eth::WatchEventNotification& notification)
    {
        log_watch_notification(notification, output_state);
    });

    if (watch_specs.empty())
    {
        SPDLOG_LOGGER_INFO(log, "No event filter configured — watching all ETH messages.");
    }
    else
    {
        for (const auto& spec : watch_specs)
        {
            eth::codec::Address contract{};
            if (!spec.contract_hex.empty())
            {
                auto addr = eth::cli::parse_address(spec.contract_hex);
                if (!addr)
                {
                    SPDLOG_LOGGER_ERROR(log, "Invalid contract address: {}", spec.contract_hex);
                    on_done(rlpx::DisconnectReason::kProtocolError);
                    return;
                }
                contract = *addr;
            }

            const auto abi_params = eth::cli::infer_params(spec.event_signature);
            (void)watch_runner->watch_event(contract, spec.event_signature, abi_params);

            if (!spec.contract_hex.empty())
            {
                SPDLOG_LOGGER_INFO(log, "Watching: {} on contract {}", spec.event_signature, spec.contract_hex);
            }
            else
            {
                SPDLOG_LOGGER_INFO(log, "Watching: {}", spec.event_signature);
            }
        }
    }

    SPDLOG_LOGGER_DEBUG(log, "run_watch: installing session bridge");
    if (!StartEthStatusHandshake(
            eth::EthStatusHandshakeStart{
                std::make_shared<eth::RlpxEthSessionChannel>(session),
                network_id,
                genesis_hash,
                fork_id,
                eth_message_schemas,
                eth::EthStatusAcceptedHandler{},
                eth::EthStatusRemoteDisconnectHandler{},
                [watch_runner](uint8_t eth_msg_id, const rlpx::ByteBuffer& payload)
                {
                    const rlp::ByteView payload_view(payload.data(), payload.size());
                    watch_runner->service().process_message(eth_msg_id, payload_view);
                }
            }))
    {
        SPDLOG_LOGGER_ERROR(log, "run_watch: failed to install ETH message handler");
        (void)session->disconnect(rlpx::DisconnectReason::kSubprotocolError);
        on_done(rlpx::DisconnectReason::kSubprotocolError);
        return;
    }
    watch_runner->install_session_bridge();

    SPDLOG_LOGGER_DEBUG(log, "run_watch: status handshake timeout armed for {} ms",
                        std::chrono::duration_cast<std::chrono::milliseconds>(
                            eth::protocol::kStatusHandshakeTimeout).count());

    session->set_disconnect_handler([status_timeout, stats_timer](const rlpx::protocol::DisconnectMessage& msg)
    {
        static auto disc_log = rlp::base::createLogger("eth_watch");
        SPDLOG_LOGGER_DEBUG(disc_log, "run_watch: Disconnected reason={}", static_cast<int>(msg.reason));
        status_timeout->cancel();
        stats_timer->cancel();
    });

    session->set_ping_handler([session](const rlpx::protocol::PingMessage&)
    {
        const rlpx::protocol::PongMessage pong;
        auto encoded = pong.encode();
        if (!encoded)
        {
            return;
        }
        rlpx::framing::Message pong_msg{};
        pong_msg.id = rlpx::kPongMessageId;
        pong_msg.payload = std::move(encoded.value());
        (void)session->post_message(std::move(pong_msg));
    });

    session->set_generic_handler([session,
                                  watch_runner,
                                  status_received,
                                  status_timeout](const rlpx::protocol::Message& msg)
    {
        static auto gh_log = rlp::base::createLogger("eth_watch");
        const uint8_t negotiated_eth_offset = session->negotiated_eth_offset();
        const auto eth_id = eth::NormalizeEthWireMessageId(msg.id, negotiated_eth_offset);
        if (!eth_id.has_value())
        {
            return;
        }

        if (!status_received->load())
        {
            SPDLOG_LOGGER_WARN(gh_log, "generic_handler: non-Status ETH message (id=0x{:02x}) received before handshake",
                               *eth_id);
            status_timeout->cancel();
            (void)session->disconnect(rlpx::DisconnectReason::kSubprotocolError);
            return;
        }

        const rlp::ByteView payload(msg.payload.data(), msg.payload.size());
        if (*eth_id == eth::protocol::kNewBlockHashesMessageId)
        {
            auto decoded = eth::protocol::decode_new_block_hashes(payload);
            if (decoded)
            {
                SPDLOG_LOGGER_DEBUG(gh_log, "generic_handler: NewBlockHashes count={}", decoded.value().entries.size());
            }
            else
            {
                SPDLOG_LOGGER_WARN(gh_log, "generic_handler: NewBlockHashes decode failed");
            }
        }

        SPDLOG_LOGGER_DEBUG(gh_log, "generic_handler: ETH msg id=0x{:02x} payload_size={}", *eth_id, msg.payload.size());
    });

    {
        boost::system::error_code hs_ec;
        SPDLOG_LOGGER_DEBUG(log, "run_watch: waiting for remote ETH Status or timeout");
        status_timeout->async_wait(boost::asio::redirect_error(yield, hs_ec));
        SPDLOG_LOGGER_DEBUG(log, "run_watch: status wait completed ec='{}' status_received={}",
                            hs_ec.message(),
                            status_received->load());

        if (!status_received->load())
        {
            if (hs_ec != boost::asio::error::operation_aborted)
            {
                SPDLOG_LOGGER_WARN(log, "run_watch: ETH Status handshake timeout ({}:{}) — peer is likely on a different chain",
                                   host, port);
                (void)session->disconnect(rlpx::DisconnectReason::kTimeout);
            }
            on_done(rlpx::DisconnectReason::kTimeout);
            return;
        }
    }

    for (;;)
    {
        stats_timer->expires_after(kWatchStatsInterval);

        boost::system::error_code stats_ec;
        stats_timer->async_wait(boost::asio::redirect_error(yield, stats_ec));
        if (stats_ec == boost::asio::error::operation_aborted)
        {
            break;
        }
        if (stats_ec)
        {
            SPDLOG_LOGGER_DEBUG(log, "run_watch: stats timer stopped: {}", stats_ec.message());
            break;
        }

        const auto stats = watch_runner->service().stats();
        SPDLOG_LOGGER_INFO(log,
                           "Watch stats [{}]: eth_messages={} new_block_hashes={} new_blocks={} receipts_messages={} "
                           "decode_failures={} receipts_requested={} receipts_processed={} logs_seen={} "
                           "matched_logs={} discarded_logs={} subscriptions={}",
                           chain_name,
                           stats.eth_messages_seen,
                           stats.new_block_hashes_messages,
                           stats.new_block_messages,
                           stats.receipts_messages,
                           stats.decode_failures,
                           stats.receipts_requested,
                           stats.receipts_processed,
                           stats.logs_seen,
                           stats.matched_logs,
                           stats.discarded_logs,
                           watch_runner->service().subscription_count());
    }

    on_done(rlpx::DisconnectReason::kRequested);
}

std::optional<eth::ForkId> parse_fork_id_hash(std::string_view value)
{
    eth::ForkId forkId{};
    if (!rlp::base::parse::hex_array(value, forkId.fork_hash))
    {
        return std::nullopt;
    }
    return forkId;
}

std::optional<uint64_t> parse_fork_id_next(std::string_view value)
{
    return rlp::base::parse::uint64_decimal(value);
}

} // namespace

// ── WatcherPool and DialScheduler are defined in include/discv4/dial_scheduler.hpp ──

int main(int argc, char** argv) {
    try {
        if (argc < 2) {
            print_usage(argv[0]);
            return 1;
        }

        // Parse --log-level first so it takes effect before any loggers are created
        for (int i = 1; i < argc - 1; ++i)
        {
            if (std::string_view(argv[i]) == "--log-level")
            {
                const std::string_view level_str(argv[i + 1]);
                spdlog::level::level_enum lvl = spdlog::level::info;
                if      (level_str == "trace")    { lvl = spdlog::level::trace; }
                else if (level_str == "debug")    { lvl = spdlog::level::debug; }
                else if (level_str == "info")     { lvl = spdlog::level::info; }
                else if (level_str == "warn")     { lvl = spdlog::level::warn; }
                else if (level_str == "error")    { lvl = spdlog::level::err; }
                else if (level_str == "critical") { lvl = spdlog::level::critical; }
                else if (level_str == "off")      { lvl = spdlog::level::off; }
                spdlog::set_level(lvl);
                spdlog::apply_all([lvl](std::shared_ptr<spdlog::logger> l) { l->set_level(lvl); });
                break;
            }
        }

        std::optional<Config> config;
        std::vector<Config> multi_chain_configs;
        int next_arg = 1;
        std::string chain_name;
        std::string chain_peers_json_path;
        std::string chain_peers_url = kDefaultChainPeersUrl;
        bool chain_peers_url_enabled = true;
        auto output_state = std::make_shared<WatchOutputState>();
        eth::EthWatchConnectionConfig watch_connection_config{};
        eth::EthPeerQueueConfig peer_queue_config{};
        eth::EthWatchDiscoveryMode discovery_mode = eth::EthWatchDiscoveryMode::kDiscoverIfNeeded;
        std::optional<uint16_t> discv5_bind_port;
        bool discover_only = false;

        for (int i = 1; i < argc; ++i)
        {
            const std::string_view arg(argv[i]);
            if ((arg == "--chain-peers-json" || arg == "--bootstrap-peers-json") && i + 1 < argc)
            {
                chain_peers_json_path = argv[++i];
            }
            else if ((arg == "--chain-peers-url" || arg == "--bootstrap-peers-url") && i + 1 < argc)
            {
                chain_peers_url = argv[++i];
                chain_peers_url_enabled = true;
            }
            else if (arg == "--no-chain-peers-url" || arg == "--no-bootstrap-peers-url")
            {
                chain_peers_url_enabled = false;
            }
        }

        if (std::string_view(argv[next_arg]) == "--chain") {
            if (argc < 3) {
                print_usage(argv[0]);
                return 1;
            }
            const std::string selected_chain_name = argv[next_arg + 1];
            next_arg += 2;

            auto chain_peer_config = load_chain_peer_config(
                selected_chain_name,
                argv[0],
                chain_peers_json_path,
                chain_peers_url,
                chain_peers_url_enabled);
            if (!chain_peer_config.has_value())
            {
                std::cout << "Unknown or unconfigured chain: " << selected_chain_name << "\n"
                          << "Expected chain metadata in chain peer cache/file.\n";
                return 1;
            }

            apply_chain_discovery_config(*chain_peer_config, argv[0]);
            Config cfg{};
            apply_chain_peer_config(cfg, *chain_peer_config);
            config = std::move(cfg);
            chain_name = config->canonical_chain_name;
        } else if (std::string_view(argv[next_arg]) == "--chains") {
            if (next_arg + 1 >= argc) {
                std::cout << "--chains requires a comma-separated chain list.\n";
                return 1;
            }
            const auto selected_chain_names = parse_chain_name_list(argv[next_arg + 1]);
            next_arg += 2;
            if (selected_chain_names.empty())
            {
                std::cout << "--chains did not include any chain names.\n";
                return 1;
            }
            for (const auto& selected_chain_name : selected_chain_names)
            {
                auto chain_peer_config = load_chain_peer_config(
                    selected_chain_name,
                    argv[0],
                    chain_peers_json_path,
                    chain_peers_url,
                    chain_peers_url_enabled);
                if (!chain_peer_config.has_value())
                {
                    std::cout << "Unknown or unconfigured chain: " << selected_chain_name << "\n"
                              << "Expected chain metadata in chain peer cache/file.\n";
                    return 1;
                }

                apply_chain_discovery_config(*chain_peer_config, argv[0]);
                Config cfg{};
                apply_chain_peer_config(cfg, *chain_peer_config);
                multi_chain_configs.push_back(std::move(cfg));
            }
            config = multi_chain_configs.front();
            chain_name = "multi-chain";
        } else if (std::string_view(argv[next_arg]) == "--all-chains") {
            ++next_arg;
            const auto default_chain_names = load_default_all_chains(argv[0]);
            if (default_chain_names.empty())
            {
                std::cout << "--all-chains requires _defaultAllChains in chains_config.json.\n";
                return 1;
            }
            for (const auto& selected_chain_name : default_chain_names)
            {
                auto chain_peer_config = load_chain_peer_config(
                    selected_chain_name,
                    argv[0],
                    chain_peers_json_path,
                    chain_peers_url,
                    chain_peers_url_enabled);
                if (!chain_peer_config.has_value())
                {
                    std::cout << "Unknown or unconfigured chain: " << selected_chain_name << "\n"
                              << "Expected chain metadata in chain peer cache/file.\n";
                    return 1;
                }

                apply_chain_discovery_config(*chain_peer_config, argv[0]);
                Config cfg{};
                apply_chain_peer_config(cfg, *chain_peer_config);
                multi_chain_configs.push_back(std::move(cfg));
            }
            if (multi_chain_configs.empty())
            {
                std::cout << "--all-chains did not load any chain configs.\n";
                return 1;
            }
            config = multi_chain_configs.front();
            chain_name = "all-chains";
        } else if (argc >= 4) {
            const auto port_value = rlp::base::parse::uint16_decimal(argv[next_arg + 1]);
            if (!port_value) {
                std::cout << "Invalid port value.\n";
                return 1;
            }

            Config cfg;
            cfg.host = argv[next_arg];
            cfg.port = *port_value;
            cfg.peer_pubkey_hex = argv[next_arg + 2];
            next_arg += 3;
            config = cfg;
        } else {
            print_usage(argv[0]);
            return 1;
        }

        // Parse optional --watch-contract / --watch-event flags
        std::string pending_contract;
        while (next_arg < argc) {
            const std::string_view arg(argv[next_arg]);

            if (arg == "--watch-contract") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--watch-contract requires an address argument.\n";
                    return 1;
                }
                pending_contract = argv[next_arg + 1];
                next_arg += 2;
            } else if (arg == "--watch-event") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--watch-event requires a signature argument.\n";
                    return 1;
                }
                eth::cli::WatchSpec spec;
                spec.contract_hex    = pending_contract;
                spec.event_signature = argv[next_arg + 1];
                config->watch_specs.push_back(std::move(spec));
                pending_contract.clear();
                next_arg += 2;
            } else if (arg == "--log-level") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--log-level requires a level argument (trace, debug, info, warn, error, critical, off).\n";
                    return 1;
                }
                const std::string_view level_str(argv[next_arg + 1]);
                if (level_str == "trace")         { spdlog::set_level(spdlog::level::trace); }
                else if (level_str == "debug")    { spdlog::set_level(spdlog::level::debug); }
                else if (level_str == "info")     { spdlog::set_level(spdlog::level::info); }
                else if (level_str == "warn")     { spdlog::set_level(spdlog::level::warn); }
                else if (level_str == "error")    { spdlog::set_level(spdlog::level::err); }
                else if (level_str == "critical") { spdlog::set_level(spdlog::level::critical); }
                else if (level_str == "off")      { spdlog::set_level(spdlog::level::off); }
                else
                {
                    std::cout << "Unknown log level: " << level_str << "\n";
                    return 1;
                }
                next_arg += 2;
            } else if (arg == "--direct-enode") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--direct-enode requires an enode argument.\n";
                    return 1;
                }
                auto direct_config = parse_enode(argv[next_arg + 1]);
                if (!direct_config) {
                    std::cout << "Invalid enode supplied to --direct-enode.\n";
                    return 1;
                }
                config->host = direct_config->host;
                config->port = direct_config->port;
                config->peer_pubkey_hex = direct_config->peer_pubkey_hex;
                config->prefer_direct_enode = true;
                next_arg += 2;
            } else if (arg == "--display-events") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--display-events requires an integer argument.\n";
                    return 1;
                }
                const auto display_events = rlp::base::parse::uint64_decimal(argv[next_arg + 1]);
                if (!display_events) {
                    std::cout << "Invalid --display-events value.\n";
                    return 1;
                }
                output_state->detailed_event_limit = *display_events;
                next_arg += 2;
            } else if (arg == "--run-seconds") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--run-seconds requires an integer argument.\n";
                    return 1;
                }
                const auto run_seconds = rlp::base::parse::uint64_decimal(argv[next_arg + 1]);
                if (!run_seconds) {
                    std::cout << "Invalid --run-seconds value.\n";
                    return 1;
                }
                output_state->run_seconds = *run_seconds;
                next_arg += 2;
            } else if (arg == "--max-peers-per-chain") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--max-peers-per-chain requires an integer argument.\n";
                    return 1;
                }
                const auto max_peers_per_chain = rlp::base::parse::uint64_decimal(argv[next_arg + 1]);
                if (!max_peers_per_chain ||
                    *max_peers_per_chain == 0 ||
                    *max_peers_per_chain > static_cast<uint64_t>(std::numeric_limits<int>::max())) {
                    std::cout << "Invalid --max-peers-per-chain value.\n";
                    return 1;
                }
                watch_connection_config.max_connections_per_chain = static_cast<int>(*max_peers_per_chain);
                next_arg += 2;
            } else if (arg == "--max-peers-total") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--max-peers-total requires an integer argument.\n";
                    return 1;
                }
                const auto max_peers_total = rlp::base::parse::uint64_decimal(argv[next_arg + 1]);
                if (!max_peers_total ||
                    *max_peers_total == 0 ||
                    *max_peers_total > static_cast<uint64_t>(std::numeric_limits<int>::max())) {
                    std::cout << "Invalid --max-peers-total value.\n";
                    return 1;
                }
                watch_connection_config.max_total_connections = static_cast<int>(*max_peers_total);
                next_arg += 2;
            } else if (arg == "--cache-peer-start-offset") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--cache-peer-start-offset requires an integer argument.\n";
                    return 1;
                }
                const auto offset = rlp::base::parse::uint64_decimal(argv[next_arg + 1]);
                if (!offset) {
                    std::cout << "Invalid --cache-peer-start-offset value.\n";
                    return 1;
                }
                peer_queue_config.cache_peer_start_offset = static_cast<size_t>(*offset);
                next_arg += 2;
            } else if (arg == "--max-pending-peers") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--max-pending-peers requires an integer argument.\n";
                    return 1;
                }
                const auto max_pending_peers = rlp::base::parse::uint64_decimal(argv[next_arg + 1]);
                if (!max_pending_peers || *max_pending_peers == 0) {
                    std::cout << "Invalid --max-pending-peers value.\n";
                    return 1;
                }
                peer_queue_config.max_pending_peers = static_cast<size_t>(*max_pending_peers);
                next_arg += 2;
            } else if (arg == "--discv5-port") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--discv5-port requires a UDP port argument.\n";
                    return 1;
                }
                const auto port = rlp::base::parse::uint16_decimal(argv[next_arg + 1]);
                if (!port || *port == 0) {
                    std::cout << "Invalid --discv5-port value.\n";
                    return 1;
                }
                discv5_bind_port = *port;
                next_arg += 2;
            } else if (arg == "--peer-selection") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--peer-selection requires a mode argument.\n";
                    return 1;
                }
                const auto parsed_mode = parse_peer_selection_mode(argv[next_arg + 1]);
                if (!parsed_mode) {
                    std::cout << "Invalid --peer-selection mode. Expected cache-only, discover-if-needed, discover-first, or hybrid.\n";
                    return 1;
                }
                discovery_mode = *parsed_mode;
                next_arg += 2;
            } else if (arg == "--discover-only") {
                discover_only = true;
                next_arg += 1;
            } else if (arg == "--network-id") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--network-id requires an integer argument.\n";
                    return 1;
                }
                const auto network_id = rlp::base::parse::uint64_decimal(argv[next_arg + 1]);
                if (!network_id) {
                    std::cout << "Invalid network id.\n";
                    return 1;
                }
                config->network_id = *network_id;
                next_arg += 2;
            } else if (arg == "--genesis-hash") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--genesis-hash requires a 32-byte hex value.\n";
                    return 1;
                }
                const auto genesis_hash = parse_hash256(argv[next_arg + 1]);
                if (!genesis_hash) {
                    std::cout << "Invalid genesis hash. Expected 32-byte hex value.\n";
                    return 1;
                }
                config->genesis_hash = *genesis_hash;
                next_arg += 2;
            } else if (arg == "--fork-id-hash") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--fork-id-hash requires a 4-byte hex value.\n";
                    return 1;
                }
                const auto fork_id = parse_fork_id_hash(argv[next_arg + 1]);
                if (!fork_id) {
                    std::cout << "Invalid fork id hash. Expected 4-byte hex value.\n";
                    return 1;
                }
                config->fork_id.fork_hash = fork_id->fork_hash;
                config->fork_id_overridden = true;
                next_arg += 2;
            } else if (arg == "--fork-id-next") {
                if (next_arg + 1 >= argc) {
                    std::cout << "--fork-id-next requires an integer argument.\n";
                    return 1;
                }
                const auto fork_next = rlp::base::parse::uint64_decimal(argv[next_arg + 1]);
                if (!fork_next) {
                    std::cout << "Invalid fork id next value.\n";
                    return 1;
                }
                config->fork_id.next_fork = *fork_next;
                config->fork_id_overridden = true;
                next_arg += 2;
            } else if (arg == "--chain-peers-json" || arg == "--bootstrap-peers-json") {
                if (next_arg + 1 >= argc) {
                    std::cout << arg << " requires a file path.\n";
                    return 1;
                }
                chain_peers_json_path = argv[next_arg + 1];
                next_arg += 2;
            } else if (arg == "--chain-peers-url" || arg == "--bootstrap-peers-url") {
                if (next_arg + 1 >= argc) {
                    std::cout << arg << " requires a URL.\n";
                    return 1;
                }
                chain_peers_url = argv[next_arg + 1];
                chain_peers_url_enabled = true;
                next_arg += 2;
            } else if (arg == "--no-chain-peers-url" || arg == "--no-bootstrap-peers-url") {
                chain_peers_url_enabled = false;
                next_arg += 1;
            } else {
                std::cout << "Unknown argument: " << arg << "\n";
                print_usage(argv[0]);
                return 1;
            }
        }

        if (config->prefer_direct_enode &&
            !config->fork_id_overridden &&
            config->canonical_chain_name.empty())
        {
            config->fork_id = eth::ForkId{};
        }

        if (!multi_chain_configs.empty())
        {
            if (config->prefer_direct_enode)
            {
                std::cout << "--direct-enode cannot be combined with multi-chain mode.\n";
                return 1;
            }
            output_state->service_chain_names.clear();
            for (auto& chain_config : multi_chain_configs)
            {
                chain_config.watch_specs = config->watch_specs;
                output_state->service_chain_names.push_back(chain_config.canonical_chain_name);
            }
        }

        boost::asio::io_context io;

        boost::asio::signal_set signals(io, SIGINT, SIGTERM);
        signals.async_wait([&](const boost::system::error_code&, int) {
            io.stop();
        });

        eth::EthWatchService service;

        if (!multi_chain_configs.empty())
        {
            static auto log = rlp::base::createLogger("eth_watch");
            auto service_watches = eth::cli::build_service_watch_specs(config->watch_specs);
            if (!service_watches)
            {
                std::cout << "Invalid watch contract address.\n";
                return 1;
            }

            std::vector<discv4::ChainPeerConfig> service_chains;
            service_chains.reserve(multi_chain_configs.size());

            for (const auto& chain_config : multi_chain_configs)
            {
                service_chains.push_back(chain_config.chain_peer_config);
                SPDLOG_LOGGER_INFO(log,
                                   "Starting eth watch service for chain '{}' with {} cached peer(s) and {} bootnode(s)",
                                   chain_config.canonical_chain_name,
                                   chain_config.chain_peer_config.nodes.size(),
                                   chain_config.chain_peer_config.bootnodes.size());
            }

            auto service_config = eth::cli::build_service_config(
                watch_connection_config,
                std::move(*service_watches),
                std::move(service_chains),
                discovery_mode);
            service_config.peer_queue = peer_queue_config;
            if (discover_only)
            {
                service_config.attach_peer_dialer = false;
            }
            if (discv5_bind_port.has_value())
            {
                service_config.discv5_discovery.bind_port = *discv5_bind_port;
            }
            else if (multi_chain_configs.size() > 1U)
            {
                service_config.discovery.bind_port = 0U;
                service_config.discv5_discovery.bind_port = 0U;
            }

            if (!service.initialize(
                    std::move(service_config),
                    [output_state](const eth::WatchEventNotification& notification)
                    {
                        log_watch_notification(notification, output_state);
                    }))
            {
                std::cout << "Invalid eth watch service configuration.\n";
                return 1;
            }
            service.run(io);
            std::cout << "Starting eth watch service for " << multi_chain_configs.size() << " chains...\n";
        }
        else if (config->use_chain_peer_cache && !config->prefer_direct_enode)
        {
            static auto log = rlp::base::createLogger("eth_watch");
            auto service_watches = eth::cli::build_service_watch_specs(config->watch_specs);
            if (!service_watches)
            {
                std::cout << "Invalid watch contract address.\n";
                return 1;
            }

            auto service_config = eth::cli::build_service_config(
                watch_connection_config,
                std::move(*service_watches),
                {config->chain_peer_config},
                discovery_mode);
            service_config.peer_queue = peer_queue_config;
            if (discover_only)
            {
                service_config.attach_peer_dialer = false;
            }
            if (discv5_bind_port.has_value())
            {
                service_config.discv5_discovery.bind_port = *discv5_bind_port;
            }
            output_state->service_chain_names = {config->canonical_chain_name};

            SPDLOG_LOGGER_INFO(log,
                               "Starting eth watch service for chain '{}' with {} cached peer(s) and {} bootnode(s)",
                               config->canonical_chain_name,
                               config->chain_peer_config.nodes.size(),
                               config->chain_peer_config.bootnodes.size());

            if (!service.initialize(
                    std::move(service_config),
                    [output_state](const eth::WatchEventNotification& notification)
                    {
                        log_watch_notification(notification, output_state);
                    }))
            {
                std::cout << "Invalid eth watch service configuration.\n";
                return 1;
            }
            service.run(io);
            std::cout << "Starting eth watch service...\n";
        }
        else
        {
            if (discover_only)
            {
                std::cout << "--discover-only requires --chain, --chains, or --all-chains.\n";
                return 1;
            }

            // Explicit host/port/pubkey mode — connect directly
            rlpx::PublicKey peer_pubkey{};
            if (!rlp::base::parse::hex_array(config->peer_pubkey_hex, peer_pubkey))
            {
                std::cout << "Invalid peer public key hex (expected 128 hex chars).\n";
                return 1;
            }

            boost::asio::spawn(io,
                [host = config->host, port = config->port, peer_pubkey,
                 chain_name = config->canonical_chain_name.empty()
                    ? std::to_string(config->network_id)
                    : config->canonical_chain_name,
                 network_id = config->network_id,
                 genesis_hash = config->genesis_hash,
                 fork_id = config->fork_id,
                 eth_message_schemas = config->chain_peer_config.eth_message_schemas,
                 watch_specs = std::move(config->watch_specs),
                 output_state](boost::asio::yield_context yc)
                {
                    run_watch(host, port, peer_pubkey,
                              chain_name,
                              network_id, genesis_hash, fork_id,
                              eth_message_schemas,
                              watch_specs,
                              output_state,
                              [](rlpx::DisconnectReason) {},
                              [](std::shared_ptr<rlpx::RlpxSession>) {},
                              yc);
                });
        }

        std::shared_ptr<boost::asio::steady_timer> service_stats_timer;
        if (service.initialized())
        {
            service_stats_timer = std::make_shared<boost::asio::steady_timer>(io);
            schedule_service_stats(io, service, service_stats_timer);
        }

        std::shared_ptr<boost::asio::steady_timer> run_limit_timer;
        if (output_state->run_seconds > 0U)
        {
            run_limit_timer = std::make_shared<boost::asio::steady_timer>(io);
            run_limit_timer->expires_after(std::chrono::seconds(output_state->run_seconds));
            run_limit_timer->async_wait([&service, service_stats_timer, output_state](const boost::system::error_code& ec)
            {
                if (ec)
                {
                    return;
                }
                static auto log = rlp::base::createLogger("eth_watch");
                SPDLOG_LOGGER_INFO(log, "Run limit reached; stopping eth_watch.");
                log_service_summary(service, output_state);
                if (service_stats_timer)
                {
                    service_stats_timer->cancel();
                }
                service.stop();
                std::cout.flush();
                std::exit(0);
            });
        }

        io.run();
        log_service_summary(service, output_state);
        return 0;
    } catch (const std::exception& ex) {
        std::cout << "Unhandled exception: " << ex.what() << "\n";
        return 1;
    } catch (...) {
        std::cout << "Unhandled exception.\n";
        return 1;
    }
}

Updated on 2026-06-05 at 17:22:18 -0700