Skip to content

eth/eth_peer_queue.hpp

Namespaces

Name
eth

Classes

Name
struct eth::EthPeerQueueConfig
Bounded peer producer queue policy.
struct eth::EthPeerQueueStatsSnapshot
struct eth::EthPeerDisconnectFeedback
Feedback from a completed or disconnected peer session.
class eth::EthPeerQueue
Producer/consumer boundary for eth-watch peer candidates.

Functions

Name
std::shared_ptr< EthPeerQueue > make_eth_peer_queue(std::shared_ptr< discv4::DialScheduler > scheduler, const discv4::ChainPeerConfig & chain_config, EthPeerQueueConfig config, bool preload_cached_peers)
Create an eth-watch peer queue and preload the chain cache split.

Functions Documentation

function make_eth_peer_queue

std::shared_ptr< EthPeerQueue > make_eth_peer_queue(
    std::shared_ptr< discv4::DialScheduler > scheduler,
    const discv4::ChainPeerConfig & chain_config,
    EthPeerQueueConfig config,
    bool preload_cached_peers
)

Create an eth-watch peer queue and preload the chain cache split.

Source code

// Copyright 2026 Genius Ventures, Inc.
// SPDX-License-Identifier: MIT

#ifndef EVMRELAY_INCLUDE_ETH_ETH_PEER_QUEUE_HPP
#define EVMRELAY_INCLUDE_ETH_ETH_PEER_QUEUE_HPP

#include <discv4/chain_peers.hpp>
#include <discv4/dial_scheduler.hpp>
#include <discovery/discovered_peer.hpp>
#include <rlpx/rlpx_types.hpp>

#include <chrono>
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

namespace eth
{

struct EthPeerQueueConfig
{
    size_t max_pending_peers = 1024;

    size_t max_disconnect_requeues = 3;

    std::chrono::steady_clock::duration too_many_peers_backoff = std::chrono::minutes(30);

    size_t cache_peer_start_offset = 0;
};

struct EthPeerQueueStatsSnapshot
{
    size_t cached_peer_count = 0;
    size_t discovered_peer_count = 0;
    size_t requeued_peer_count = 0;
    size_t duplicate_peer_drop_count = 0;
    size_t capacity_drop_count = 0;
    size_t flaky_peer_drop_count = 0;
    size_t too_many_peers_backoff_count = 0;
    size_t backoff_drop_count = 0;
    size_t disconnect_feedback_count = 0;
    size_t disconnected_before_connected_count = 0;
    size_t disconnected_after_connected_count = 0;
    size_t too_many_peers_before_connected_count = 0;
    size_t too_many_peers_after_connected_count = 0;
    size_t tcp_failure_count = 0;
    size_t timeout_count = 0;
    size_t subprotocol_error_count = 0;
};

struct EthPeerDisconnectFeedback
{
    discv4::ValidatedPeer  peer{};
    rlpx::DisconnectReason reason = rlpx::DisconnectReason::kRequested;
    bool                   was_connected = false;
};

class EthPeerQueue
{
public:
    explicit EthPeerQueue(
        std::shared_ptr<discv4::DialScheduler> scheduler,
        EthPeerQueueConfig                     config = {}) noexcept;
    ~EthPeerQueue();

    void preload_cached_peers(const std::vector<discv4::ValidatedPeer>& peers) noexcept;

    void set_discovery_bootnodes(std::vector<discv4::ValidatedPeer> bootnodes) noexcept;

    [[nodiscard]] bool enqueue_discovered_peer(const discv4::DiscoveredPeer& peer) noexcept;

    [[nodiscard]] bool enqueue_validated_discovery_peer(const discovery::ValidatedPeer& peer) noexcept;

    [[nodiscard]] bool report_peer_disconnected(const EthPeerDisconnectFeedback& feedback) noexcept;

    [[nodiscard]] const std::vector<discv4::ValidatedPeer>& discovery_bootnodes() const noexcept;
    [[nodiscard]] bool needs_discovery() const noexcept;
    [[nodiscard]] size_t cached_peer_count() const noexcept;
    [[nodiscard]] size_t discovered_peer_count() const noexcept;
    [[nodiscard]] size_t requeued_peer_count() const noexcept;
    [[nodiscard]] size_t duplicate_peer_drop_count() const noexcept;
    [[nodiscard]] size_t capacity_drop_count() const noexcept;
    [[nodiscard]] size_t flaky_peer_drop_count() const noexcept;
    [[nodiscard]] size_t too_many_peers_backoff_count() const noexcept;
    [[nodiscard]] size_t backoff_drop_count() const noexcept;
    [[nodiscard]] EthPeerQueueStatsSnapshot stats() const noexcept;
    [[nodiscard]] std::shared_ptr<discv4::DialScheduler> scheduler() const noexcept;

private:
    [[nodiscard]] bool enqueue_candidate(discv4::ValidatedPeer peer, bool allow_known_peer) noexcept;
    [[nodiscard]] std::vector<discv4::ValidatedPeer> rotate_cached_peers(
        const std::vector<discv4::ValidatedPeer>& peers) const;
    [[nodiscard]] std::vector<discv4::ValidatedPeer> spread_cached_peers_for_dial_slots(
        const std::vector<discv4::ValidatedPeer>& peers) const;
    [[nodiscard]] static std::string node_key(const discv4::NodeId& node_id);
    [[nodiscard]] static bool is_requeueable_disconnect(const EthPeerDisconnectFeedback& feedback) noexcept;

    std::shared_ptr<discv4::DialScheduler> scheduler_;
    EthPeerQueueConfig                      config_{};
    std::vector<discv4::ValidatedPeer>     discovery_bootnodes_;
    std::unordered_set<std::string>         seen_node_ids_;
    std::unordered_map<std::string, size_t> disconnect_counts_;
    std::unordered_map<std::string, std::chrono::steady_clock::time_point> backoff_until_;
    size_t                                 cached_peer_count_ = 0;
    size_t                                 discovered_peer_count_ = 0;
    size_t                                 requeued_peer_count_ = 0;
    size_t                                 duplicate_peer_drop_count_ = 0;
    size_t                                 capacity_drop_count_ = 0;
    size_t                                 flaky_peer_drop_count_ = 0;
    size_t                                 too_many_peers_backoff_count_ = 0;
    size_t                                 backoff_drop_count_ = 0;
    size_t                                 disconnect_feedback_count_ = 0;
    size_t                                 disconnected_before_connected_count_ = 0;
    size_t                                 disconnected_after_connected_count_ = 0;
    size_t                                 too_many_peers_before_connected_count_ = 0;
    size_t                                 too_many_peers_after_connected_count_ = 0;
    size_t                                 tcp_failure_count_ = 0;
    size_t                                 timeout_count_ = 0;
    size_t                                 subprotocol_error_count_ = 0;
};

[[nodiscard]] std::shared_ptr<EthPeerQueue> make_eth_peer_queue(
    std::shared_ptr<discv4::DialScheduler> scheduler,
    const discv4::ChainPeerConfig&         chain_config,
    EthPeerQueueConfig                     config = {},
    bool                                   preload_cached_peers = true);

} // namespace eth

#endif // EVMRELAY_INCLUDE_ETH_ETH_PEER_QUEUE_HPP

Updated on 2026-06-05 at 17:22:19 -0700