Skip to content

discv4/dial_scheduler.hpp

Namespaces

Name
discv4

Classes

Name
struct discv4::ValidatedPeer
A discovered peer whose public key has already been validated.
struct discv4::WatcherPoolConfig
Configurable connection limits for a WatcherPool. Defaults keep three active dial/watch slots per chain.
struct discv4::WatcherPool
Global resource pool shared across all chain DialSchedulers. Enforces a two-level fd cap: total across all chains, and per chain.
struct discv4::DialScheduler
Per-chain dial scheduler mirroring go-ethereum's dialScheduler. Maintains up to pool->max_per_chain concurrent dial coroutines, respecting the global pool->max_total cap across all chains. All methods run on the single io_context thread — no mutex needed.

Types

Name
using std::function< void( ValidatedPeer vp, std::function< void(rlpx::DisconnectReason)> on_done, std::function< void(std::shared_ptr< rlpx::RlpxSession >)> on_connected, boost::asio::yield_context yield)> DialFn
using std::function< void( const ValidatedPeer & vp, rlpx::DisconnectReason reason, bool was_connected)> DialFeedbackFn
Callback signature for dial/session exit feedback.
using std::function< bool(const DiscoveredPeer &)> FilterFn
Predicate applied to a DiscoveredPeer before it is enqueued for dialing. Return true to allow dialing, false to drop. When unset (nullptr), all peers are accepted. Mirrors go-ethereum UDPv4::NewNodeFilter (eth/protocols/eth/discovery.go).

Functions

Name
FilterFn make_fork_id_filter(const std::array< uint8_t, 4U > & expected_hash)
Create a FilterFn that accepts only peers whose ENR eth entry carries a ForkId with the given 4-byte hash (CRC32 of genesis + applied forks).

Types Documentation

using DialFn

using discv4::DialFn = std::function<void(
   ValidatedPeer                                               vp,
   std::function<void(rlpx::DisconnectReason)>                 on_done,
   std::function<void(std::shared_ptr<rlpx::RlpxSession>)>    on_connected,
   boost::asio::yield_context                                  yield)>;

Callback signature for what to run per dial attempt. vp — the peer to connect to on_done — call on every exit path with the disconnect/failure reason (recycling the slot) on_connected — call once the ETH handshake is confirmed yield — coroutine yield context

using DialFeedbackFn

using discv4::DialFeedbackFn = std::function<void(
   const ValidatedPeer&     vp,
   rlpx::DisconnectReason   reason,
   bool                     was_connected)>;

Callback signature for dial/session exit feedback.

using FilterFn

using discv4::FilterFn = std::function<bool( const DiscoveredPeer& )>;

Predicate applied to a DiscoveredPeer before it is enqueued for dialing. Return true to allow dialing, false to drop. When unset (nullptr), all peers are accepted. Mirrors go-ethereum UDPv4::NewNodeFilter (eth/protocols/eth/discovery.go).

Functions Documentation

function make_fork_id_filter

inline FilterFn make_fork_id_filter(
    const std::array< uint8_t, 4U > & expected_hash
)

Create a FilterFn that accepts only peers whose ENR eth entry carries a ForkId with the given 4-byte hash (CRC32 of genesis + applied forks).

Parameters:

  • expected_hash 4-byte CRC32 fork hash to match against.

Return: FilterFn suitable for assignment to DialScheduler::filter_fn.

Mirrors go-ethereum NewNodeFilter (eth/protocols/eth/discovery.go):

return func(n *enode.Node) bool {
    var entry enrEntry
    if err := n.Load(&entry); err != nil { return false }
    return filter(entry.ForkID) == nil
}

Peers with no eth_fork_id (ENR absent or eth entry missing) are dropped.

Source code

// Copyright 2026 Genius Ventures, Inc.
// SPDX-License-Identifier: MIT
#ifndef EVMRELAY_INCLUDE_DISCV4_DIAL_SCHEDULER_HPP
#define EVMRELAY_INCLUDE_DISCV4_DIAL_SCHEDULER_HPP

#include <atomic>
#include <deque>
#include <functional>
#include <memory>
#include <vector>

#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>

#include <discv4/dial_history.hpp>
#include <discv4/discv4_client.hpp>
#include <rlpx/rlpx_session.hpp>
#include <base/rlp-logger.hpp>

namespace discv4 {

struct ValidatedPeer
{
    DiscoveredPeer   peer;
    rlpx::PublicKey  pubkey;
};

struct WatcherPoolConfig
{
    int max_total;
    int max_per_chain;
};

struct WatcherPool
{
    int               max_total;
    int               max_per_chain;
    std::atomic<int>  active_total{0};

    explicit WatcherPool(const WatcherPoolConfig& config)
        : max_total(config.max_total), max_per_chain(config.max_per_chain) {}

    WatcherPool(int max_total_, int max_per_chain_)
        : max_total(max_total_), max_per_chain(max_per_chain_) {}
};

using DialFn = std::function<void(
    ValidatedPeer                                               vp,
    std::function<void(rlpx::DisconnectReason)>                 on_done,
    std::function<void(std::shared_ptr<rlpx::RlpxSession>)>    on_connected,
    boost::asio::yield_context                                  yield)>;

using DialFeedbackFn = std::function<void(
    const ValidatedPeer&     vp,
    rlpx::DisconnectReason   reason,
    bool                     was_connected)>;

using FilterFn = std::function<bool( const DiscoveredPeer& )>;

struct DialScheduler : std::enable_shared_from_this<DialScheduler>
{
    boost::asio::io_context&              io;
    std::shared_ptr<WatcherPool>          pool;
    DialFn                                dial_fn;
    DialFeedbackFn                        feedback_fn{};
    FilterFn                              filter_fn{};  
    std::shared_ptr<DialHistory>          dial_history;

    int                                          active{0};
    int                                          validated_count{0};   
    int                                          total_validated{0};   
    bool                                         stopping{false};
    std::deque<ValidatedPeer>                    queue;
    std::vector<std::weak_ptr<rlpx::RlpxSession>> active_sessions;

    DialScheduler(boost::asio::io_context&  io_,
                  std::shared_ptr<WatcherPool> pool_,
                  DialFn                    dial_fn_)
        : io(io_)
        , pool(std::move(pool_))
        , dial_fn(std::move(dial_fn_))
        , dial_history(std::make_shared<DialHistory>())
    {}

    void enqueue(ValidatedPeer vp)
    {
        static auto log = rlp::base::createLogger("dial_scheduler");
        if ( stopping )
        {
            log->debug("Dropping peer {}:{} because scheduler is stopping",
                       vp.peer.ip,
                       vp.peer.tcp_port);
            return;
        }

        // Drop peers that do not match the chain filter (e.g. wrong ForkId).
        if ( filter_fn && !filter_fn( vp.peer ) )
        {
            if (vp.peer.eth_fork_id.has_value())
            {
                const auto& fork_id = vp.peer.eth_fork_id.value();
                log->debug("Dropping peer {}:{} because fork id {:02x}{:02x}{:02x}{:02x} did not match chain filter",
                           vp.peer.ip,
                           vp.peer.tcp_port,
                           fork_id.hash[0],
                           fork_id.hash[1],
                           fork_id.hash[2],
                           fork_id.hash[3]);
            }
            else
            {
                log->debug("Dropping peer {}:{} because no fork id was available for chain filter",
                           vp.peer.ip,
                           vp.peer.tcp_port);
            }
            return;
        }

        dial_history->expire();
        if (dial_history->contains(vp.peer.node_id))
        {
            log->debug("Dropping peer {}:{} because it is already in dial history",
                       vp.peer.ip,
                       vp.peer.tcp_port);
            return;
        }

        if (active < pool->max_per_chain &&
            pool->active_total.load() < pool->max_total)
        {
            ++active;
            ++pool->active_total;
            dial_history->add(vp.peer.node_id);
            spawn_dial(std::move(vp));
        }
        else
        {
            queue.push_back(std::move(vp));
        }
    }

    void release()
    {
        --active;
        --pool->active_total;

        if (stopping) { return; }

        dial_history->expire();
        while (active < pool->max_per_chain &&
               pool->active_total.load() < pool->max_total &&
               !queue.empty())
        {
            ValidatedPeer vp = std::move(queue.front());
            queue.pop_front();
            if (dial_history->contains(vp.peer.node_id)) { continue; }
            ++active;
            ++pool->active_total;
            dial_history->add(vp.peer.node_id);
            spawn_dial(std::move(vp));
        }
    }

    void stop()
    {
        stopping = true;
        queue.clear();
        for (auto& ws : active_sessions)
        {
            if (auto s = ws.lock())
            {
                (void)s->disconnect(rlpx::DisconnectReason::kClientQuitting);
            }
        }
        active_sessions.clear();
    }

private:
    void spawn_dial(ValidatedPeer vp)
    {
        auto sched         = shared_from_this();
        auto was_validated = std::make_shared<bool>(false);
        auto reported      = std::make_shared<bool>(false);
        static auto log    = rlp::base::createLogger("dial_scheduler");
        log->debug("Dialing peer: {}:{}", vp.peer.ip, vp.peer.tcp_port);
        boost::asio::spawn(io,
            [sched, vp = std::move(vp), was_validated, reported](boost::asio::yield_context yc)
            {
                sched->dial_fn(
                    vp,
                    [sched, vp, was_validated, reported](rlpx::DisconnectReason reason)
                    {
                        if (sched->feedback_fn && !*reported)
                        {
                            *reported = true;
                            sched->feedback_fn(vp, reason, *was_validated);
                        }
                        if (*was_validated) { --sched->validated_count; }
                        sched->release();
                    },
                    [sched, vp, was_validated, reported](std::shared_ptr<rlpx::RlpxSession> s)
                    {
                        *was_validated = true;
                        ++sched->validated_count;
                        ++sched->total_validated;
                        s->set_disconnect_handler(
                            [sched, vp, reported](const rlpx::protocol::DisconnectMessage& msg)
                            {
                                if (sched->feedback_fn)
                                {
                                    *reported = true;
                                    sched->feedback_fn(vp, msg.reason, true);
                                }
                            });
                        sched->active_sessions.push_back(s);
                    },
                    yc);
            });
    }
}; // struct DialScheduler

[[nodiscard]] inline FilterFn make_fork_id_filter(
    const std::array<uint8_t, 4U>& expected_hash ) noexcept
{
    return [expected_hash]( const DiscoveredPeer& peer ) -> bool
    {
        if ( !peer.eth_fork_id.has_value() )
        {
            return false;
        }
        return peer.eth_fork_id.value().hash == expected_hash;
    };
}

} // namespace discv4

#endif // EVMRELAY_INCLUDE_DISCV4_DIAL_SCHEDULER_HPP

Updated on 2026-06-05 at 17:22:18 -0700