Skip to content

discovery/test_discv5_connect.cpp

Classes

Name
struct DialStats
struct QualityFilterState

Functions

Name
std::string pubkey_to_hex(const rlpx::PublicKey & pubkey)
eth::Hash256 sepolia_genesis()
bool is_publicly_routable_ip(const std::string & ip)
std::optional< std::string > subnet_key_v4_24(const std::string & ip)
bool is_candidate_blocked(const discv4::ValidatedPeer & vp, const std::shared_ptr< QualityFilterState > & quality)
void dial_connect_only(discv4::ValidatedPeer vp, std::function< void()> on_done, std::function< void(std::shared_ptr< rlpx::RlpxSession >)> on_connected, boost::asio::yield_context yield, std::shared_ptr< DialStats > stats, std::shared_ptr< QualityFilterState > quality, eth::ForkId fork_id)
int main(int argc, char ** argv)

Attributes

Name
uint64_t kSepoliaNetworkId
uint8_t kEthOffset
uint16_t kSepoliaRlpxPort
const std::array< uint8_t, 4U > kSepoliaForkHashFallback

Functions Documentation

function pubkey_to_hex

static std::string pubkey_to_hex(
    const rlpx::PublicKey & pubkey
)

function sepolia_genesis

static eth::Hash256 sepolia_genesis()

function is_publicly_routable_ip

static bool is_publicly_routable_ip(
    const std::string & ip
)

function subnet_key_v4_24

static std::optional< std::string > subnet_key_v4_24(
    const std::string & ip
)

function is_candidate_blocked

static bool is_candidate_blocked(
    const discv4::ValidatedPeer & vp,
    const std::shared_ptr< QualityFilterState > & quality
)

function dial_connect_only

static void dial_connect_only(
    discv4::ValidatedPeer vp,
    std::function< void()> on_done,
    std::function< void(std::shared_ptr< rlpx::RlpxSession >)> on_connected,
    boost::asio::yield_context yield,
    std::shared_ptr< DialStats > stats,
    std::shared_ptr< QualityFilterState > quality,
    eth::ForkId fork_id
)

function main

int main(
    int argc,
    char ** argv
)

Attributes Documentation

variable kSepoliaNetworkId

static uint64_t kSepoliaNetworkId = 11155111;

variable kEthOffset

static uint8_t kEthOffset = 0x10;

variable kSepoliaRlpxPort

static uint16_t kSepoliaRlpxPort = 30303;

variable kSepoliaForkHashFallback

static const std::array< uint8_t, 4U > kSepoliaForkHashFallback { 0x26, 0x89, 0x56, 0xb6 };

Source code

// Copyright 2025 GeniusVentures
// SPDX-License-Identifier: Apache-2.0
//
// examples/discovery/test_discv5_connect.cpp
//
// Live functional harness:
//   discv5 discovery (Sepolia) -> RLPx ETH Status validation.
//
// Pass criterion:
//   connected (right chain) >= --connections (default: 3)
//
// Usage:
//   ./test_discv5_connect [--log-level debug] [--timeout 180] [--connections 3] [--dials 16]

#include <array>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <tuple>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/address.hpp>
#include <boost/asio/redirect_error.hpp>
#include <boost/asio/signal_set.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>

#include <discv4/bootnodes_test.hpp>
#include <discv4/dial_scheduler.hpp>
#include <discv5/discv5_bootnodes.hpp>
#include <discv5/discv5_client.hpp>
#include <discv5/discv5_enr.hpp>
#include <eth/eth_types.hpp>
#include <eth/messages.hpp>
#include <rlpx/crypto/ecdh.hpp>
#include <rlpx/framing/message_stream.hpp>
#include <rlpx/protocol/messages.hpp>
#include <rlpx/rlpx_error.hpp>
#include <rlpx/rlpx_session.hpp>
#include <base/rlp-logger.hpp>

#include <spdlog/spdlog.h>

#include "../chain_config.hpp"

static constexpr uint64_t kSepoliaNetworkId = 11155111;
static constexpr uint8_t  kEthOffset        = 0x10;
static constexpr uint16_t kSepoliaRlpxPort  = 30303;

static std::string pubkey_to_hex(const rlpx::PublicKey& pubkey)
{
    static constexpr char kHex[] = "0123456789abcdef";
    std::string out;
    out.reserve(pubkey.size() * 2U);
    for (const uint8_t byte : pubkey)
    {
        out.push_back(kHex[(byte >> 4U) & 0x0FU]);
        out.push_back(kHex[byte & 0x0FU]);
    }
    return out;
}

static eth::Hash256 sepolia_genesis()
{
    eth::Hash256 h{};
    const char* hex = "25a5cc106eea7138acab33231d7160d69cb777ee0c2c553fcddf5138993e6dd9";
    for (size_t i = 0; i < 32; ++i)
    {
        auto nibble = [](char c) -> uint8_t {
            if (c >= '0' && c <= '9') { return static_cast<uint8_t>(c - '0'); }
            if (c >= 'a' && c <= 'f') { return static_cast<uint8_t>(10 + c - 'a'); }
            return 0U;
        };
        h[i] = static_cast<uint8_t>((nibble(hex[i * 2U]) << 4U) | nibble(hex[i * 2U + 1U]));
    }
    return h;
}

// Sepolia post-BPO2 fallback hash — used only when chains.json is not found.
static const std::array<uint8_t, 4U> kSepoliaForkHashFallback{ 0x26, 0x89, 0x56, 0xb6 };

struct DialStats
{
    std::atomic<int> dialed{0};
    std::atomic<int> connect_failed{0};
    std::atomic<int> wrong_chain{0};
    std::atomic<int> status_timeout{0};
    std::atomic<int> too_many_peers{0};
    std::atomic<int> connected{0};
    std::atomic<int> connected_seeded{0};
    std::atomic<int> connected_discv5{0};
    std::atomic<int> filtered_bad_peers{0};
};

struct QualityFilterState
{
    std::unordered_map<std::string, int> fail_counts{};
    std::unordered_map<std::string, int> ip_fail_counts{};
    std::unordered_map<uint16_t, int> port_fail_counts{};
    std::unordered_map<std::string, int> subnet_enqueue_counts{};
    std::unordered_set<std::string> blocked_pubkeys{};
    std::unordered_set<std::string> blocked_ips{};
    std::unordered_set<uint16_t> blocked_ports{};
    int block_threshold{2};
    int ip_block_threshold{3};
    int port_block_threshold{4};
    int subnet_enqueue_limit{2};
};

static bool is_publicly_routable_ip(const std::string& ip)
{
    boost::system::error_code ec;
    const auto addr = boost::asio::ip::make_address(ip, ec);
    if (ec)
    {
        return false;
    }

    if (addr.is_v4())
    {
        const auto b = addr.to_v4().to_bytes();
        if (b[0] == 0U || b[0] == 10U || b[0] == 127U)
        {
            return false;
        }
        if (b[0] == 169U && b[1] == 254U)
        {
            return false;
        }
        if (b[0] == 172U && b[1] >= 16U && b[1] <= 31U)
        {
            return false;
        }
        if (b[0] == 192U && b[1] == 168U)
        {
            return false;
        }
        if (b[0] >= 224U)
        {
            return false;
        }
        return true;
    }

    const auto v6 = addr.to_v6();
    return !v6.is_loopback() && !v6.is_link_local() && !v6.is_multicast() && !v6.is_unspecified();
}

static std::optional<std::string> subnet_key_v4_24(const std::string& ip)
{
    boost::system::error_code ec;
    const auto addr = boost::asio::ip::make_address(ip, ec);
    if (ec || !addr.is_v4())
    {
        return std::nullopt;
    }

    const auto b = addr.to_v4().to_bytes();
    return std::to_string(b[0]) + "." + std::to_string(b[1]) + "." + std::to_string(b[2]);
}

static bool is_candidate_blocked(
    const discv4::ValidatedPeer& vp,
    const std::shared_ptr<QualityFilterState>& quality)
{
    const std::string pubkey = pubkey_to_hex(vp.pubkey);
    if (quality->blocked_pubkeys.find(pubkey) != quality->blocked_pubkeys.end())
    {
        return true;
    }

    if (quality->blocked_ips.find(vp.peer.ip) != quality->blocked_ips.end())
    {
        return true;
    }

    if (quality->blocked_ports.find(vp.peer.tcp_port) != quality->blocked_ports.end())
    {
        return true;
    }

    return false;
}

static void dial_connect_only(
    discv4::ValidatedPeer vp,
    std::function<void()> on_done,
    std::function<void(std::shared_ptr<rlpx::RlpxSession>)> on_connected,
    boost::asio::yield_context yield,
    std::shared_ptr<DialStats> stats,
    std::shared_ptr<QualityFilterState> quality,
    eth::ForkId fork_id)
{
    ++stats->dialed;

    auto keypair_result = rlpx::crypto::Ecdh::generate_ephemeral_keypair();
    if (!keypair_result)
    {
        ++stats->connect_failed;
        on_done();
        return;
    }
    const auto& keypair = keypair_result.value();

    const rlpx::SessionConnectParams params{
        vp.peer.ip,
        vp.peer.tcp_port,
        keypair.public_key,
        keypair.private_key,
        vp.pubkey,
        "rlp-test-discv5-connect",
        0
    };

    auto session_result = rlpx::RlpxSession::connect(params, yield);
    if (!session_result)
    {
        const std::string key = pubkey_to_hex(vp.pubkey);
        const int fails = ++quality->fail_counts[key];
        if (fails >= quality->block_threshold)
        {
            quality->blocked_pubkeys.insert(key);
        }

        const int ip_fails = ++quality->ip_fail_counts[vp.peer.ip];
        if (ip_fails >= quality->ip_block_threshold)
        {
            quality->blocked_ips.insert(vp.peer.ip);
        }

        const int port_fails = ++quality->port_fail_counts[vp.peer.tcp_port];
        if (port_fails >= quality->port_block_threshold)
        {
            quality->blocked_ports.insert(vp.peer.tcp_port);
        }

        ++stats->connect_failed;
        on_done();
        return;
    }
    auto session = std::move(session_result.value());

    {
        const eth::Hash256 genesis = sepolia_genesis();
        eth::StatusMessage69 status69{
            69,
            kSepoliaNetworkId,
            genesis,
            fork_id,
            0,
            0,
            genesis,
        };
        eth::StatusMessage status = status69;
        auto encoded = eth::protocol::encode_status(status);
        if (encoded)
        {
            (void)session->post_message(rlpx::framing::Message{
                static_cast<uint8_t>(kEthOffset + eth::protocol::kStatusMessageId),
                std::move(encoded.value())
            });
        }
    }

    auto executor = yield.get_executor();
    auto status_received = std::make_shared<std::atomic<bool>>(false);
    auto status_timeout = std::make_shared<boost::asio::steady_timer>(executor);
    auto lifetime = std::make_shared<boost::asio::steady_timer>(executor);
    auto disconnect_reason = std::make_shared<std::atomic<int>>(
        static_cast<int>(rlpx::DisconnectReason::kRequested));
    status_timeout->expires_after(eth::protocol::kStatusHandshakeTimeout);
    lifetime->expires_after(std::chrono::seconds(10));

    session->set_disconnect_handler(
        [lifetime, status_timeout, disconnect_reason](const rlpx::protocol::DisconnectMessage& msg)
        {
            disconnect_reason->store(static_cast<int>(msg.reason));
            lifetime->cancel();
            status_timeout->cancel();
        });

    session->set_ping_handler([session](const rlpx::protocol::PingMessage&)
    {
        const rlpx::protocol::PongMessage pong;
        auto encoded = pong.encode();
        if (!encoded)
        {
            return;
        }

        (void)session->post_message(rlpx::framing::Message{
            rlpx::kPongMessageId,
            std::move(encoded.value())
        });
    });

    const eth::Hash256 genesis = sepolia_genesis();
    session->set_generic_handler([session, status_received, status_timeout,
                                  on_connected, genesis, stats](const rlpx::protocol::Message& msg)
    {
        if (msg.id < kEthOffset)
        {
            return;
        }

        const auto eth_id = static_cast<uint8_t>(msg.id - kEthOffset);
        if (eth_id != eth::protocol::kStatusMessageId)
        {
            return;
        }

        const rlp::ByteView payload(msg.payload.data(), msg.payload.size());
        auto decoded = eth::protocol::decode_status(payload);
        if (!decoded)
        {
            status_timeout->cancel();
            (void)session->disconnect(rlpx::DisconnectReason::kSubprotocolError);
            return;
        }

        auto valid = eth::protocol::validate_status(decoded.value(), kSepoliaNetworkId, genesis);
        if (!valid)
        {
            ++stats->wrong_chain;
            status_timeout->cancel();
            (void)session->disconnect(rlpx::DisconnectReason::kSubprotocolError);
            return;
        }

        ++stats->connected;
        status_received->store(true);
        status_timeout->cancel();
        on_connected(session);
    });

    boost::system::error_code hs_ec;
    status_timeout->async_wait(boost::asio::redirect_error(yield, hs_ec));

    if (!status_received->load())
    {
        if (hs_ec)
        {
            const auto reason = static_cast<rlpx::DisconnectReason>(disconnect_reason->load());
            if (reason == rlpx::DisconnectReason::kTooManyPeers)
            {
                ++stats->too_many_peers;
            }
            else
            {
                ++stats->connect_failed;
            }
        }
        else
        {
            ++stats->status_timeout;
        }

        (void)session->disconnect(rlpx::DisconnectReason::kTimeout);
        on_done();
        return;
    }

    boost::system::error_code lt_ec;
    lifetime->async_wait(boost::asio::redirect_error(yield, lt_ec));
    on_done();
}

int main(int argc, char** argv)
{
    int timeout_secs = 180;
    int min_connections = 3;
    int max_dials = 16;
    bool enable_seeded = false;
    bool require_fork = true;
    bool enqueue_bootstrap_candidates = false;

    for (int i = 1; i < argc; ++i)
    {
        std::string_view arg(argv[i]);
        if (arg == "--log-level" && i + 1 < argc)
        {
            std::string_view lvl(argv[++i]);
            if (lvl == "trace") { spdlog::set_level(spdlog::level::trace); }
            else if (lvl == "debug") { spdlog::set_level(spdlog::level::debug); }
            else if (lvl == "info") { spdlog::set_level(spdlog::level::info); }
            else if (lvl == "warn") { spdlog::set_level(spdlog::level::warn); }
            else if (lvl == "off") { spdlog::set_level(spdlog::level::off); }
        }
        else if (arg == "--timeout" && i + 1 < argc)
        {
            timeout_secs = std::atoi(argv[++i]);
        }
        else if (arg == "--connections" && i + 1 < argc)
        {
            min_connections = std::atoi(argv[++i]);
        }
        else if (arg == "--dials" && i + 1 < argc)
        {
            max_dials = std::atoi(argv[++i]);
        }
        else if (arg == "--seeded" && i + 1 < argc)
        {
            const std::string_view mode(argv[++i]);
            if (mode == "on")
            {
                enable_seeded = true;
            }
            else if (mode == "off")
            {
                enable_seeded = false;
            }
            else
            {
                std::cout << "Invalid --seeded value (use on|off)\n";
                return 1;
            }
        }
        else if (arg == "--require-fork" && i + 1 < argc)
        {
            const std::string_view mode(argv[++i]);
            if (mode == "on")
            {
                require_fork = true;
            }
            else if (mode == "off")
            {
                require_fork = false;
            }
            else
            {
                std::cout << "Invalid --require-fork value (use on|off)\n";
                return 1;
            }
        }
        else if (arg == "--enqueue-bootstrap-candidates" && i + 1 < argc)
        {
            const std::string_view mode(argv[++i]);
            if (mode == "on")
            {
                enqueue_bootstrap_candidates = true;
            }
            else if (mode == "off")
            {
                enqueue_bootstrap_candidates = false;
            }
            else
            {
                std::cout << "Invalid --enqueue-bootstrap-candidates value (use on|off)\n";
                return 1;
            }
        }
    }

    const auto loaded_hash = load_fork_hash("sepolia", argv[0]);
    if (!loaded_hash)
    {
        std::cout << "[  WARN    ] chains.json not found or missing 'sepolia' key — using compiled-in fallback hash.\n";
    }

    const std::array<uint8_t, 4U> sepolia_hash = loaded_hash.value_or(kSepoliaForkHashFallback);
    const eth::ForkId sepolia_fork_id{ sepolia_hash, 0U };
    const discovery::ForkId sepolia_discovery_fork_id{ sepolia_hash, 0U };

    boost::asio::io_context io;

    std::atomic<int> discovered_peers{0};
    std::atomic<int> discovered_candidates{0};
    auto stats = std::make_shared<DialStats>();

    auto keypair_result = rlpx::crypto::Ecdh::generate_ephemeral_keypair();
    if (!keypair_result)
    {
        std::cout << "Failed to generate keypair\n";
        return 1;
    }

    discv5::discv5Config dv5_cfg;
    dv5_cfg.bind_port = 0;
    dv5_cfg.query_interval_sec = 10U;
    if (require_fork)
    {
        dv5_cfg.required_fork_id = sepolia_discovery_fork_id;
    }
    std::copy(
        keypair_result.value().private_key.begin(),
        keypair_result.value().private_key.end(),
        dv5_cfg.private_key.begin());
    std::copy(
        keypair_result.value().public_key.begin(),
        keypair_result.value().public_key.end(),
        dv5_cfg.public_key.begin());

    auto source = discv5::ChainBootnodeRegistry::for_chain(discv5::ChainId::kEthereumSepolia);
    if (!source)
    {
        std::cout << "Failed to load Sepolia discv5 bootnode source\n";
        return 1;
    }

    dv5_cfg.bootstrap_enrs = source->fetch();
    if (dv5_cfg.bootstrap_enrs.empty())
    {
        std::cout << "No discv5 Sepolia bootnodes configured\n";
        return 1;
    }

    auto dv5 = std::make_shared<discv5::discv5_client>(io, dv5_cfg);

    const int kMaxActiveDials = 50;
    auto pool = std::make_shared<discv4::WatcherPool>(kMaxActiveDials, max_dials * 2);
    auto sched_ref = std::make_shared<discv4::DialScheduler*>(nullptr);
    auto seeded_pubkeys = std::make_shared<std::unordered_set<std::string>>();
    auto quality = std::make_shared<QualityFilterState>();

    boost::asio::steady_timer deadline(io, std::chrono::seconds(timeout_secs));

    auto scheduler = std::make_shared<discv4::DialScheduler>(
        io,
        pool,
        [&io, &deadline, min_connections, sched_ref, stats, seeded_pubkeys, quality, sepolia_fork_id]
        (discv4::ValidatedPeer vp,
         std::function<void()> on_done,
         std::function<void(std::shared_ptr<rlpx::RlpxSession>)> on_connected,
         boost::asio::yield_context yc) mutable
        {
            dial_connect_only(
                vp,
                std::move(on_done),
                [on_connected, &io, &deadline, min_connections, sched_ref, stats, seeded_pubkeys]
                (std::shared_ptr<rlpx::RlpxSession> s) mutable
                {
                    const std::string remote_pubkey_hex = pubkey_to_hex(s->peer_info().public_key);
                    if (seeded_pubkeys->find(remote_pubkey_hex) != seeded_pubkeys->end())
                    {
                        ++stats->connected_seeded;
                    }
                    else
                    {
                        ++stats->connected_discv5;
                    }
                    on_connected(s);
                    if (*sched_ref && (*sched_ref)->total_validated >= min_connections)
                    {
                        deadline.cancel();
                        io.stop();
                    }
                },
                yc,
                stats,
                quality,
                sepolia_fork_id);
        });
    *sched_ref = scheduler.get();

    auto parse_enode = [](const std::string& enode)
        -> std::optional<std::tuple<std::string, uint16_t, std::string>>
    {
        const std::string prefix = "enode://";
        if (enode.substr(0, prefix.size()) != prefix)
        {
            return std::nullopt;
        }
        const auto at = enode.find('@', prefix.size());
        if (at == std::string::npos)
        {
            return std::nullopt;
        }
        const auto colon = enode.rfind(':');
        if (colon == std::string::npos || colon < at)
        {
            return std::nullopt;
        }

        std::string pubkey = enode.substr(prefix.size(), at - prefix.size());
        std::string host = enode.substr(at + 1, colon - at - 1);
        uint16_t port = static_cast<uint16_t>(std::stoi(enode.substr(colon + 1)));
        return std::make_tuple(host, port, pubkey);
    };

    auto hex_to_nibble = [](char c) -> std::optional<uint8_t>
    {
        if (c >= '0' && c <= '9') { return static_cast<uint8_t>(c - '0'); }
        if (c >= 'a' && c <= 'f') { return static_cast<uint8_t>(10 + c - 'a'); }
        if (c >= 'A' && c <= 'F') { return static_cast<uint8_t>(10 + c - 'A'); }
        return std::nullopt;
    };

    if (enable_seeded)
    {
        for (const auto& enode : ETHEREUM_SEPOLIA_BOOTNODES)
        {
            auto parsed = parse_enode(enode);
            if (!parsed)
            {
                continue;
            }

            const auto& [host, port, pubkey_hex] = *parsed;
            if (port != kSepoliaRlpxPort || pubkey_hex.size() != 128U)
            {
                continue;
            }

            discv4::ValidatedPeer vp;
            vp.peer.ip = host;
            vp.peer.udp_port = port;
            vp.peer.tcp_port = port;
            vp.peer.last_seen = std::chrono::steady_clock::now();

            bool ok = true;
            for (size_t i = 0; i < vp.pubkey.size() && ok; ++i)
            {
                auto hi = hex_to_nibble(pubkey_hex[i * 2U]);
                auto lo = hex_to_nibble(pubkey_hex[i * 2U + 1U]);
                if (!hi || !lo)
                {
                    ok = false;
                    break;
                }
                vp.pubkey[i] = static_cast<uint8_t>((*hi << 4U) | *lo);
                vp.peer.node_id[i] = vp.pubkey[i];
            }

            if (!ok || !rlpx::crypto::Ecdh::verify_public_key(vp.pubkey))
            {
                continue;
            }

            if (is_candidate_blocked(vp, quality))
            {
                ++stats->filtered_bad_peers;
                continue;
            }

            seeded_pubkeys->insert(pubkey_to_hex(vp.pubkey));
            scheduler->enqueue(std::move(vp));
        }
    }

    dv5->set_peer_discovered_callback(
        [scheduler, stats, quality, &discovered_peers, &discovered_candidates, sepolia_hash, require_fork]
        (const discovery::ValidatedPeer& peer)
        {
            ++discovered_candidates;

            if (peer.tcp_port == 0)
            {
                return;
            }

            if (require_fork && (!peer.eth_fork_id.has_value() || peer.eth_fork_id.value().hash != sepolia_hash))
            {
                return;
            }

            discv4::ValidatedPeer vp;
            vp.peer.node_id = peer.node_id;
            vp.peer.ip = peer.ip;
            vp.peer.udp_port = peer.udp_port;
            vp.peer.tcp_port = peer.tcp_port;
            vp.peer.last_seen = peer.last_seen;
            if (peer.eth_fork_id.has_value())
            {
                vp.peer.eth_fork_id = discv4::ForkId{
                    peer.eth_fork_id.value().hash,
                    peer.eth_fork_id.value().next
                };
            }

            vp.pubkey = peer.node_id;
            if (!rlpx::crypto::Ecdh::verify_public_key(vp.pubkey))
            {
                return;
            }

            if (!is_publicly_routable_ip(vp.peer.ip))
            {
                ++stats->filtered_bad_peers;
                return;
            }

            if (is_candidate_blocked(vp, quality))
            {
                ++stats->filtered_bad_peers;
                return;
            }

            const auto subnet_key = subnet_key_v4_24(vp.peer.ip);
            if (subnet_key.has_value())
            {
                const int queued = quality->subnet_enqueue_counts[subnet_key.value()];
                if (queued >= quality->subnet_enqueue_limit)
                {
                    ++stats->filtered_bad_peers;
                    return;
                }
                ++quality->subnet_enqueue_counts[subnet_key.value()];
            }

            ++discovered_peers;
            scheduler->enqueue(std::move(vp));
        });

    dv5->set_error_callback([](const std::string& msg)
    {
        std::cout << "discv5 error: " << msg << "\n";
    });

    if (enqueue_bootstrap_candidates)
    {
        for (const auto& enr_uri : dv5_cfg.bootstrap_enrs)
        {
            auto record_result = discv5::EnrParser::parse(enr_uri);
            if (!record_result)
            {
                continue;
            }

            auto peer_result = discv5::EnrParser::to_validated_peer(record_result.value());
            if (!peer_result)
            {
                continue;
            }

            if (peer_result.value().tcp_port == 0)
            {
                continue;
            }

            discv4::ValidatedPeer vp;
            vp.peer.node_id = peer_result.value().node_id;
            vp.peer.ip = peer_result.value().ip;
            vp.peer.udp_port = peer_result.value().udp_port;
            vp.peer.tcp_port = peer_result.value().tcp_port;
            vp.peer.last_seen = peer_result.value().last_seen;
            if (peer_result.value().eth_fork_id.has_value())
            {
                vp.peer.eth_fork_id = discv4::ForkId{
                    peer_result.value().eth_fork_id.value().hash,
                    peer_result.value().eth_fork_id.value().next
                };
            }

            vp.pubkey = peer_result.value().node_id;
            if (!rlpx::crypto::Ecdh::verify_public_key(vp.pubkey))
            {
                continue;
            }

            if (!is_publicly_routable_ip(vp.peer.ip))
            {
                ++stats->filtered_bad_peers;
                continue;
            }

            if (is_candidate_blocked(vp, quality))
            {
                ++stats->filtered_bad_peers;
                continue;
            }

            const auto subnet_key = subnet_key_v4_24(vp.peer.ip);
            if (subnet_key.has_value())
            {
                const int queued = quality->subnet_enqueue_counts[subnet_key.value()];
                if (queued >= quality->subnet_enqueue_limit)
                {
                    ++stats->filtered_bad_peers;
                    continue;
                }
                ++quality->subnet_enqueue_counts[subnet_key.value()];
            }

            scheduler->enqueue(std::move(vp));
        }
    }

    deadline.async_wait([&](boost::system::error_code)
    {
        scheduler->stop();
        dv5->stop();
        io.stop();
    });

    boost::asio::signal_set signals(io, SIGINT, SIGTERM);
    signals.async_wait([&](boost::system::error_code, int)
    {
        deadline.cancel();
        scheduler->stop();
        dv5->stop();
        io.stop();
    });

    const auto start_result = dv5->start();
    if (!start_result)
    {
        std::cout << "Failed to start discv5 client\n";
        return 1;
    }

    std::cout << "Running discv5 Sepolia discovery + connect harness (seeded="
              << (enable_seeded ? "on" : "off")
              << ", require-fork=" << (require_fork ? "on" : "off")
              << ", enqueue-bootstrap-candidates=" << (enqueue_bootstrap_candidates ? "on" : "off")
              << ")...\n";
    io.run();

    std::cout << "\n[  STATS   ] Dial breakdown:\n"
              << "              dialed:                  " << stats->dialed.load() << "\n"
              << "              connect failed:          " << stats->connect_failed.load() << "\n"
              << "              wrong chain:             " << stats->wrong_chain.load() << "\n"
              << "              too many peers:          " << stats->too_many_peers.load() << "\n"
              << "              status timeout:          " << stats->status_timeout.load() << "\n"
              << "              connected (right chain): " << stats->connected.load() << "\n"
              << "              connected (seeded):      " << stats->connected_seeded.load() << "\n"
              << "              connected (discv5):      " << stats->connected_discv5.load() << "\n"
              << "              filtered bad peers:      " << stats->filtered_bad_peers.load() << "\n"
              << "              candidates seen:         " << discovered_candidates.load() << "\n"
              << "              discovered peers:        " << discovered_peers.load() << "\n";

    const int connections = scheduler->total_validated;
    if (connections >= min_connections)
    {
        std::cout << "\n[       OK ] Discv5ConnectHarness.ActiveSepoliaConnections\n"
                  << "            " << connections << " active Sepolia ETH Status connection(s) confirmed\n\n";
        std::cout.flush();
        std::exit(0);
    }

    std::cout << "\n[  FAILED  ] Discv5ConnectHarness.ActiveSepoliaConnections\n"
              << "            Only " << connections << "/" << min_connections
              << " Sepolia connection(s) — run with --log-level debug for details\n\n";
    std::cout.flush();
    std::exit(1);
}

Updated on 2026-04-13 at 23:22:46 -0700