Skip to content

discv4/dial_scheduler.hpp

Namespaces

Name
discv4

Classes

Name
struct discv4::ValidatedPeer
A discovered peer whose public key has already been validated.
struct discv4::WatcherPool
Global resource pool shared across all chain DialSchedulers. Enforces a two-level fd cap: total across all chains, and per chain. max_total — global fd cap (mobile default 12, desktop 200) max_per_chain — per-chain cap (mobile default 3, desktop 50).
struct discv4::DialScheduler
Per-chain dial scheduler mirroring go-ethereum's dialScheduler. Maintains up to pool->max_per_chain concurrent dial coroutines, respecting the global pool->max_total cap across all chains. All methods run on the single io_context thread — no mutex needed.

Types

Name
using std::function< void( ValidatedPeer vp, std::function< void()> on_done, std::function< void(std::shared_ptr< rlpx::RlpxSession >)> on_connected, boost::asio::yield_context yield)> DialFn
using std::function< bool(const DiscoveredPeer &)> FilterFn
Predicate applied to a DiscoveredPeer before it is enqueued for dialing. Return true to allow dialing, false to drop. When unset (nullptr), all peers are accepted. Mirrors go-ethereum UDPv4::NewNodeFilter (eth/protocols/eth/discovery.go).

Functions

Name
FilterFn make_fork_id_filter(const std::array< uint8_t, 4U > & expected_hash)
Create a FilterFn that accepts only peers whose ENR eth entry carries a ForkId with the given 4-byte hash (CRC32 of genesis + applied forks).

Types Documentation

using DialFn

using discv4::DialFn = std::function<void(
   ValidatedPeer                                               vp,
   std::function<void()>                                       on_done,
   std::function<void(std::shared_ptr<rlpx::RlpxSession>)>    on_connected,
   boost::asio::yield_context                                  yield)>;

Callback signature for what to run per dial attempt. vp — the peer to connect to on_done — call on every exit path (recycling the slot) on_connected — call once the ETH handshake is confirmed yield — coroutine yield context

using FilterFn

using discv4::FilterFn = std::function<bool( const DiscoveredPeer& )>;

Predicate applied to a DiscoveredPeer before it is enqueued for dialing. Return true to allow dialing, false to drop. When unset (nullptr), all peers are accepted. Mirrors go-ethereum UDPv4::NewNodeFilter (eth/protocols/eth/discovery.go).

Functions Documentation

function make_fork_id_filter

inline FilterFn make_fork_id_filter(
    const std::array< uint8_t, 4U > & expected_hash
)

Create a FilterFn that accepts only peers whose ENR eth entry carries a ForkId with the given 4-byte hash (CRC32 of genesis + applied forks).

Parameters:

  • expected_hash 4-byte CRC32 fork hash to match against.

Return: FilterFn suitable for assignment to DialScheduler::filter_fn.

Mirrors go-ethereum NewNodeFilter (eth/protocols/eth/discovery.go):

return func(n *enode.Node) bool {
    var entry enrEntry
    if err := n.Load(&entry); err != nil { return false }
    return filter(entry.ForkID) == nil
}

Peers with no eth_fork_id (ENR absent or eth entry missing) are dropped.

Source code

// Copyright 2025 GeniusVentures
// SPDX-License-Identifier: Apache-2.0
#pragma once

#include <atomic>
#include <deque>
#include <functional>
#include <memory>
#include <vector>

#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>

#include <discv4/dial_history.hpp>
#include <discv4/discv4_client.hpp>
#include <rlpx/rlpx_session.hpp>
#include <base/rlp-logger.hpp>

namespace discv4 {

struct ValidatedPeer
{
    DiscoveredPeer   peer;
    rlpx::PublicKey  pubkey;
};

struct WatcherPool
{
    int               max_total;
    int               max_per_chain;
    std::atomic<int>  active_total{0};

    WatcherPool(int max_total_, int max_per_chain_)
        : max_total(max_total_), max_per_chain(max_per_chain_) {}
};

using DialFn = std::function<void(
    ValidatedPeer                                               vp,
    std::function<void()>                                       on_done,
    std::function<void(std::shared_ptr<rlpx::RlpxSession>)>    on_connected,
    boost::asio::yield_context                                  yield)>;

using FilterFn = std::function<bool( const DiscoveredPeer& )>;

struct DialScheduler : std::enable_shared_from_this<DialScheduler>
{
    boost::asio::io_context&              io;
    std::shared_ptr<WatcherPool>          pool;
    DialFn                                dial_fn;
    FilterFn                              filter_fn{};  
    std::shared_ptr<DialHistory>          dial_history;

    int                                          active{0};
    int                                          validated_count{0};   
    int                                          total_validated{0};   
    bool                                         stopping{false};
    std::deque<ValidatedPeer>                    queue;
    std::vector<std::weak_ptr<rlpx::RlpxSession>> active_sessions;

    DialScheduler(boost::asio::io_context&  io_,
                  std::shared_ptr<WatcherPool> pool_,
                  DialFn                    dial_fn_)
        : io(io_)
        , pool(std::move(pool_))
        , dial_fn(std::move(dial_fn_))
        , dial_history(std::make_shared<DialHistory>())
    {}

    void enqueue(ValidatedPeer vp)
    {
        if ( stopping )
        {
            return;
        }

        // Drop peers that do not match the chain filter (e.g. wrong ForkId).
        if ( filter_fn && !filter_fn( vp.peer ) )
        {
            return;
        }

        dial_history->expire();
        if (dial_history->contains(vp.peer.node_id)) { return; }

        if (active < pool->max_per_chain &&
            pool->active_total.load() < pool->max_total)
        {
            ++active;
            ++pool->active_total;
            dial_history->add(vp.peer.node_id);
            spawn_dial(std::move(vp));
        }
        else
        {
            queue.push_back(std::move(vp));
        }
    }

    void release()
    {
        --active;
        --pool->active_total;

        if (stopping) { return; }

        dial_history->expire();
        while (active < pool->max_per_chain &&
               pool->active_total.load() < pool->max_total &&
               !queue.empty())
        {
            ValidatedPeer vp = std::move(queue.front());
            queue.pop_front();
            if (dial_history->contains(vp.peer.node_id)) { continue; }
            ++active;
            ++pool->active_total;
            dial_history->add(vp.peer.node_id);
            spawn_dial(std::move(vp));
        }
    }

    void stop()
    {
        stopping = true;
        queue.clear();
        for (auto& ws : active_sessions)
        {
            if (auto s = ws.lock())
            {
                (void)s->disconnect(rlpx::DisconnectReason::kClientQuitting);
            }
        }
        active_sessions.clear();
    }

private:
    void spawn_dial(ValidatedPeer vp)
    {
        auto sched         = shared_from_this();
        auto was_validated = std::make_shared<bool>(false);
        static auto log    = rlp::base::createLogger("dial_scheduler");
        SPDLOG_LOGGER_DEBUG(log, "Dialing peer: {}:{}", vp.peer.ip, vp.peer.tcp_port);
        boost::asio::spawn(io,
            [sched, vp = std::move(vp), was_validated](boost::asio::yield_context yc)
            {
                sched->dial_fn(
                    vp,
                    [sched, was_validated]()
                    {
                        if (*was_validated) { --sched->validated_count; }
                        sched->release();
                    },
                    [sched, was_validated](std::shared_ptr<rlpx::RlpxSession> s)
                    {
                        *was_validated = true;
                        ++sched->validated_count;
                        ++sched->total_validated;
                        sched->active_sessions.push_back(s);
                    },
                    yc);
            });
    }
}; // struct DialScheduler

[[nodiscard]] inline FilterFn make_fork_id_filter(
    const std::array<uint8_t, 4U>& expected_hash ) noexcept
{
    return [expected_hash]( const DiscoveredPeer& peer ) -> bool
    {
        if ( !peer.eth_fork_id.has_value() )
        {
            return false;
        }
        return peer.eth_fork_id.value().hash == expected_hash;
    };
}

} // namespace discv4

Updated on 2026-04-13 at 23:22:46 -0700