eth/eth_peer_queue.cpp¶
Namespaces¶
| Name |
|---|
| eth |
Functions¶
| Name | |
|---|---|
| std::shared_ptr< EthPeerQueue > | make_eth_peer_queue(std::shared_ptr< discv4::DialScheduler > scheduler, const discv4::ChainPeerConfig & chain_config, EthPeerQueueConfig config, bool preload_cached_peers) Create an eth-watch peer queue and preload the chain cache split. |
Functions Documentation¶
function make_eth_peer_queue¶
std::shared_ptr< EthPeerQueue > make_eth_peer_queue(
std::shared_ptr< discv4::DialScheduler > scheduler,
const discv4::ChainPeerConfig & chain_config,
EthPeerQueueConfig config,
bool preload_cached_peers
)
Create an eth-watch peer queue and preload the chain cache split.
Source code¶
// Copyright 2026 Genius Ventures, Inc.
// SPDX-License-Identifier: MIT
#include <eth/eth_peer_queue.hpp>
#include <base/rlp-logger.hpp>
#include <algorithm>
#include <string_view>
namespace eth
{
EthPeerQueue::EthPeerQueue(
std::shared_ptr<discv4::DialScheduler> scheduler,
EthPeerQueueConfig config) noexcept
: scheduler_(std::move(scheduler))
, config_(config)
{
if (scheduler_)
{
scheduler_->feedback_fn =
[this](const discv4::ValidatedPeer& peer, rlpx::DisconnectReason reason, bool was_connected)
{
(void)report_peer_disconnected(EthPeerDisconnectFeedback{peer, reason, was_connected});
};
}
}
EthPeerQueue::~EthPeerQueue()
{
if (scheduler_)
{
scheduler_->feedback_fn = nullptr;
}
}
void EthPeerQueue::preload_cached_peers(const std::vector<discv4::ValidatedPeer>& peers) noexcept
{
const auto rotated_peers = rotate_cached_peers(peers);
const auto spread_peers = spread_cached_peers_for_dial_slots(rotated_peers);
for (const auto& peer : spread_peers)
{
if (enqueue_candidate(peer, false))
{
++cached_peer_count_;
}
}
}
void EthPeerQueue::set_discovery_bootnodes(std::vector<discv4::ValidatedPeer> bootnodes) noexcept
{
discovery_bootnodes_ = std::move(bootnodes);
}
bool EthPeerQueue::enqueue_discovered_peer(const discv4::DiscoveredPeer& peer) noexcept
{
discv4::ValidatedPeer candidate{};
candidate.peer = peer;
std::copy(peer.node_id.begin(), peer.node_id.end(), candidate.pubkey.begin());
if (!enqueue_candidate(std::move(candidate), false))
{
return false;
}
++discovered_peer_count_;
return true;
}
bool EthPeerQueue::enqueue_validated_discovery_peer(const discovery::ValidatedPeer& peer) noexcept
{
discv4::ValidatedPeer candidate{};
candidate.peer.node_id = peer.node_id;
candidate.peer.ip = peer.ip;
candidate.peer.udp_port = peer.udp_port;
candidate.peer.tcp_port = peer.tcp_port;
candidate.peer.last_seen = peer.last_seen;
if (peer.eth_fork_id.has_value())
{
discv4::ForkId fork_id{};
fork_id.hash = peer.eth_fork_id->hash;
fork_id.next = peer.eth_fork_id->next;
candidate.peer.eth_fork_id = fork_id;
}
std::copy(peer.node_id.begin(), peer.node_id.end(), candidate.pubkey.begin());
if (!enqueue_candidate(std::move(candidate), false))
{
return false;
}
++discovered_peer_count_;
return true;
}
bool EthPeerQueue::report_peer_disconnected(const EthPeerDisconnectFeedback& feedback) noexcept
{
const auto key = node_key(feedback.peer.peer.node_id);
++disconnect_feedback_count_;
if (feedback.was_connected)
{
++disconnected_after_connected_count_;
}
else
{
++disconnected_before_connected_count_;
}
switch (feedback.reason)
{
case rlpx::DisconnectReason::kTooManyPeers:
if (feedback.was_connected)
{
++too_many_peers_after_connected_count_;
}
else
{
++too_many_peers_before_connected_count_;
}
break;
case rlpx::DisconnectReason::kTcpError:
++tcp_failure_count_;
break;
case rlpx::DisconnectReason::kTimeout:
++timeout_count_;
break;
case rlpx::DisconnectReason::kSubprotocolError:
++subprotocol_error_count_;
break;
default:
break;
}
if (feedback.reason == rlpx::DisconnectReason::kTooManyPeers)
{
static auto log = rlp::base::createLogger("eth_peer_queue");
backoff_until_[key] = std::chrono::steady_clock::now() + config_.too_many_peers_backoff;
++too_many_peers_backoff_count_;
log->debug("Backing off peer {}:{} after TooManyPeers disconnect",
feedback.peer.peer.ip,
feedback.peer.peer.tcp_port);
return false;
}
if (!is_requeueable_disconnect(feedback))
{
return false;
}
auto& disconnect_count = disconnect_counts_[key];
if (disconnect_count >= config_.max_disconnect_requeues)
{
++flaky_peer_drop_count_;
return false;
}
++disconnect_count;
if (!enqueue_candidate(feedback.peer, true))
{
return false;
}
++requeued_peer_count_;
return true;
}
const std::vector<discv4::ValidatedPeer>& EthPeerQueue::discovery_bootnodes() const noexcept
{
return discovery_bootnodes_;
}
bool EthPeerQueue::needs_discovery() const noexcept
{
return cached_peer_count_ == 0 && !discovery_bootnodes_.empty();
}
size_t EthPeerQueue::cached_peer_count() const noexcept
{
return cached_peer_count_;
}
size_t EthPeerQueue::discovered_peer_count() const noexcept
{
return discovered_peer_count_;
}
size_t EthPeerQueue::requeued_peer_count() const noexcept
{
return requeued_peer_count_;
}
size_t EthPeerQueue::duplicate_peer_drop_count() const noexcept
{
return duplicate_peer_drop_count_;
}
size_t EthPeerQueue::capacity_drop_count() const noexcept
{
return capacity_drop_count_;
}
size_t EthPeerQueue::flaky_peer_drop_count() const noexcept
{
return flaky_peer_drop_count_;
}
size_t EthPeerQueue::too_many_peers_backoff_count() const noexcept
{
return too_many_peers_backoff_count_;
}
size_t EthPeerQueue::backoff_drop_count() const noexcept
{
return backoff_drop_count_;
}
EthPeerQueueStatsSnapshot EthPeerQueue::stats() const noexcept
{
EthPeerQueueStatsSnapshot snapshot{};
snapshot.cached_peer_count = cached_peer_count_;
snapshot.discovered_peer_count = discovered_peer_count_;
snapshot.requeued_peer_count = requeued_peer_count_;
snapshot.duplicate_peer_drop_count = duplicate_peer_drop_count_;
snapshot.capacity_drop_count = capacity_drop_count_;
snapshot.flaky_peer_drop_count = flaky_peer_drop_count_;
snapshot.too_many_peers_backoff_count = too_many_peers_backoff_count_;
snapshot.backoff_drop_count = backoff_drop_count_;
snapshot.disconnect_feedback_count = disconnect_feedback_count_;
snapshot.disconnected_before_connected_count = disconnected_before_connected_count_;
snapshot.disconnected_after_connected_count = disconnected_after_connected_count_;
snapshot.too_many_peers_before_connected_count = too_many_peers_before_connected_count_;
snapshot.too_many_peers_after_connected_count = too_many_peers_after_connected_count_;
snapshot.tcp_failure_count = tcp_failure_count_;
snapshot.timeout_count = timeout_count_;
snapshot.subprotocol_error_count = subprotocol_error_count_;
return snapshot;
}
std::shared_ptr<discv4::DialScheduler> EthPeerQueue::scheduler() const noexcept
{
return scheduler_;
}
bool EthPeerQueue::enqueue_candidate(discv4::ValidatedPeer peer, bool allow_known_peer) noexcept
{
const auto key = node_key(peer.peer.node_id);
const auto now = std::chrono::steady_clock::now();
if (const auto backoff = backoff_until_.find(key);
backoff != backoff_until_.end())
{
if (now < backoff->second)
{
++backoff_drop_count_;
return false;
}
backoff_until_.erase(backoff);
}
const auto insert_result = seen_node_ids_.insert(key);
if (!insert_result.second && !allow_known_peer)
{
++duplicate_peer_drop_count_;
return false;
}
if (!scheduler_)
{
return true;
}
const bool can_start_immediately =
scheduler_->active < scheduler_->pool->max_per_chain &&
scheduler_->pool->active_total.load() < scheduler_->pool->max_total;
if (!can_start_immediately && scheduler_->queue.size() >= config_.max_pending_peers)
{
if (insert_result.second)
{
seen_node_ids_.erase(key);
}
++capacity_drop_count_;
return false;
}
scheduler_->enqueue(std::move(peer));
return true;
}
std::vector<discv4::ValidatedPeer> EthPeerQueue::rotate_cached_peers(
const std::vector<discv4::ValidatedPeer>& peers) const
{
if (peers.size() <= 1U || config_.cache_peer_start_offset == 0U)
{
return peers;
}
const size_t offset = config_.cache_peer_start_offset % peers.size();
if (offset == 0U)
{
return peers;
}
std::vector<discv4::ValidatedPeer> rotated;
rotated.reserve(peers.size());
rotated.insert(rotated.end(), peers.begin() + static_cast<std::ptrdiff_t>(offset), peers.end());
rotated.insert(rotated.end(), peers.begin(), peers.begin() + static_cast<std::ptrdiff_t>(offset));
return rotated;
}
std::vector<discv4::ValidatedPeer> EthPeerQueue::spread_cached_peers_for_dial_slots(
const std::vector<discv4::ValidatedPeer>& peers) const
{
if (!scheduler_ || !scheduler_->pool || peers.size() <= 1U)
{
return peers;
}
const auto configured_slots = scheduler_->pool->max_per_chain;
if (configured_slots <= 1)
{
return peers;
}
const auto band_count = std::min(
peers.size(),
static_cast<size_t>(configured_slots));
if (band_count <= 1U)
{
return peers;
}
std::vector<discv4::ValidatedPeer> spread;
spread.reserve(peers.size());
for (size_t offset = 0U; spread.size() < peers.size(); ++offset)
{
for (size_t band = 0U; band < band_count; ++band)
{
const size_t begin = (peers.size() * band) / band_count;
const size_t end = (peers.size() * (band + 1U)) / band_count;
const size_t index = begin + offset;
if (index < end)
{
spread.push_back(peers[index]);
}
}
}
return spread;
}
std::string EthPeerQueue::node_key(const discv4::NodeId& node_id)
{
static constexpr std::string_view kHexDigits = "0123456789abcdef";
std::string key;
key.reserve(node_id.size() * 2U);
for (const auto byte : node_id)
{
key.push_back(kHexDigits[(byte >> 4U) & 0x0fU]);
key.push_back(kHexDigits[byte & 0x0fU]);
}
return key;
}
bool EthPeerQueue::is_requeueable_disconnect(const EthPeerDisconnectFeedback& feedback) noexcept
{
switch (feedback.reason)
{
case rlpx::DisconnectReason::kTcpError:
case rlpx::DisconnectReason::kTimeout:
return feedback.was_connected;
default:
return false;
}
}
std::shared_ptr<EthPeerQueue> make_eth_peer_queue(
std::shared_ptr<discv4::DialScheduler> scheduler,
const discv4::ChainPeerConfig& chain_config,
EthPeerQueueConfig config,
bool preload_cached_peers)
{
auto queue = std::make_shared<EthPeerQueue>(std::move(scheduler), config);
queue->set_discovery_bootnodes(chain_config.bootnodes);
if (preload_cached_peers)
{
queue->preload_cached_peers(chain_config.nodes);
}
return queue;
}
} // namespace eth
Updated on 2026-06-05 at 17:22:19 -0700