Skip to content

discv5/discv5_crawler.cpp

Namespaces

Name
discv5

Source code

// Copyright 2025 GeniusVentures
// SPDX-License-Identifier: Apache-2.0

#include "discv5/discv5_crawler.hpp"
#include "discv5/discv5_enr.hpp"

#include <algorithm>
#include <sstream>
#include <iomanip>

namespace discv5
{

// ---------------------------------------------------------------------------
// Constructor
// ---------------------------------------------------------------------------

discv5_crawler::discv5_crawler(const discv5Config& config) noexcept
    : config_(config)
{
}

// ---------------------------------------------------------------------------
// add_bootstrap
// ---------------------------------------------------------------------------

void discv5_crawler::add_bootstrap(const EnrRecord& record) noexcept
{
    auto peer_result = EnrParser::to_validated_peer(record);
    if (!peer_result)
    {
        ++stat_invalid_enr_;
        return;
    }
    process_found_peers({ peer_result.value() });
}

// ---------------------------------------------------------------------------
// set_peer_discovered_callback / set_error_callback
// ---------------------------------------------------------------------------

void discv5_crawler::set_peer_discovered_callback(PeerDiscoveredCallback callback) noexcept
{
    peer_callback_ = std::move(callback);
}

void discv5_crawler::set_error_callback(ErrorCallback callback) noexcept
{
    error_callback_ = std::move(callback);
}

// ---------------------------------------------------------------------------
// start
// ---------------------------------------------------------------------------

VoidResult discv5_crawler::start() noexcept
{
    if (running_.exchange(true))
    {
        return discv5Error::kCrawlerAlreadyRunning;
    }

    // Seed the queue from the configured bootstrap ENR/enode URIs.
    for (const auto& uri : config_.bootstrap_enrs)
    {
        if (uri.rfind("enr:", 0U) == 0U)
        {
            enqueue_enr_uri(uri);
        }
        else if (uri.rfind("enode://", 0U) == 0U)
        {
            enqueue_enode_uri(uri);
        }
    }

    return outcome::success();
}

// ---------------------------------------------------------------------------
// stop
// ---------------------------------------------------------------------------

VoidResult discv5_crawler::stop() noexcept
{
    if (!running_.exchange(false))
    {
        return discv5Error::kCrawlerNotRunning;
    }
    return outcome::success();
}

// ---------------------------------------------------------------------------
// process_found_peers
// ---------------------------------------------------------------------------

void discv5_crawler::process_found_peers(const std::vector<ValidatedPeer>& peers) noexcept
{
    std::lock_guard<std::mutex> lock(state_mutex_);

    for (const auto& peer : peers)
    {
        const std::string key = node_key(peer.node_id);

        // Dedup: skip if already discovered or already queued.
        if (discovered_ids_.count(key) != 0U)
        {
            ++stat_duplicates_;
            continue;
        }

        // Check if already in the queued list (linear scan — queue is bounded).
        const bool already_queued = std::any_of(
            queued_peers_.begin(), queued_peers_.end(),
            [&key](const ValidatedPeer& qp)
            {
                return node_key(qp.node_id) == key;
            });

        if (already_queued)
        {
            ++stat_duplicates_;
            continue;
        }

        queued_peers_.push_back(peer);
    }
}

// ---------------------------------------------------------------------------
// ingest_discovered_peers
// ---------------------------------------------------------------------------

void discv5_crawler::ingest_discovered_peers(const std::vector<ValidatedPeer>& peers) noexcept
{
    for (const auto& peer : peers)
    {
        bool already_discovered = false;
        bool already_queued = false;

        {
            std::lock_guard<std::mutex> lock(state_mutex_);
            const std::string key = node_key(peer.node_id);
            already_discovered = (discovered_ids_.count(key) != 0U);

            already_queued = std::any_of(
                queued_peers_.begin(), queued_peers_.end(),
                [&key](const ValidatedPeer& qp)
                {
                    return node_key(qp.node_id) == key;
                });

            if (!already_discovered && !already_queued)
            {
                queued_peers_.push_back(peer);
            }
        }

        if (already_discovered)
        {
            ++stat_duplicates_;
            continue;
        }

        emit_peer(peer);
    }
}

// ---------------------------------------------------------------------------
// stats
// ---------------------------------------------------------------------------

CrawlerStats discv5_crawler::stats() const noexcept
{
    std::lock_guard<std::mutex> lock(state_mutex_);

    CrawlerStats s{};
    s.queued      = queued_peers_.size();
    s.measured    = measured_ids_.size();
    s.failed      = failed_ids_.size();
    s.discovered  = stat_discovered_.load();
    s.invalid_enr = stat_invalid_enr_.load();
    s.wrong_chain = stat_wrong_chain_.load();
    s.no_eth_entry = stat_no_eth_entry_.load();
    s.duplicates  = stat_duplicates_.load();
    return s;
}

// ---------------------------------------------------------------------------
// is_running
// ---------------------------------------------------------------------------

bool discv5_crawler::is_running() const noexcept
{
    return running_.load();
}

// ---------------------------------------------------------------------------
// mark_measured / mark_failed
// ---------------------------------------------------------------------------

void discv5_crawler::mark_measured(const NodeId& node_id) noexcept
{
    std::lock_guard<std::mutex> lock(state_mutex_);
    measured_ids_.insert(node_key(node_id));
}

void discv5_crawler::mark_failed(const NodeId& node_id) noexcept
{
    std::lock_guard<std::mutex> lock(state_mutex_);
    failed_ids_.insert(node_key(node_id));
}

// ---------------------------------------------------------------------------
// dequeue_next
// ---------------------------------------------------------------------------

std::optional<ValidatedPeer> discv5_crawler::dequeue_next() noexcept
{
    std::lock_guard<std::mutex> lock(state_mutex_);

    if (queued_peers_.empty())
    {
        return std::nullopt;
    }

    ValidatedPeer peer = queued_peers_.front();
    queued_peers_.erase(queued_peers_.begin());
    return peer;
}

// ---------------------------------------------------------------------------
// is_discovered
// ---------------------------------------------------------------------------

bool discv5_crawler::is_discovered(const NodeId& node_id) const noexcept
{
    std::lock_guard<std::mutex> lock(state_mutex_);
    return discovered_ids_.count(node_key(node_id)) != 0U;
}

// ---------------------------------------------------------------------------
// Private: enqueue_enr_uri
// ---------------------------------------------------------------------------

void discv5_crawler::enqueue_enr_uri(const std::string& uri) noexcept
{
    auto record_result = EnrParser::parse(uri);
    if (!record_result)
    {
        ++stat_invalid_enr_;
        if (error_callback_)
        {
            error_callback_("Invalid ENR URI: " + uri.substr(0U, 60U));
        }
        return;
    }

    auto peer_result = EnrParser::to_validated_peer(record_result.value());
    if (!peer_result)
    {
        ++stat_invalid_enr_;
        return;
    }

    process_found_peers({ peer_result.value() });
}

// ---------------------------------------------------------------------------
// Private: enqueue_enode_uri
// ---------------------------------------------------------------------------

void discv5_crawler::enqueue_enode_uri(const std::string& uri) noexcept
{
    // Expected format: enode://<128-hex-pubkey>@<ip>:<port>
    static constexpr std::string_view kEnodePrefix = "enode://";
    static constexpr size_t           kPubkeyHexLen = 128U;

    if (uri.size() < kEnodePrefix.size() + kPubkeyHexLen + 2U)  // +2 for '@' and ':'
    {
        ++stat_invalid_enr_;
        return;
    }

    const std::string_view body(uri.data() + kEnodePrefix.size(),
                                uri.size() - kEnodePrefix.size());

    // Locate '@' separator between pubkey and host:port.
    const size_t at_pos = body.find('@');
    if (at_pos == std::string_view::npos || at_pos != kPubkeyHexLen)
    {
        ++stat_invalid_enr_;
        return;
    }

    // Decode 128-hex pubkey → 64 bytes.
    NodeId node_id{};
    const std::string_view hex_key = body.substr(0U, kPubkeyHexLen);

    static constexpr size_t kHexCharsPerByte = 2U;
    for (size_t i = 0U; i < kNodeIdBytes; ++i)
    {
        const size_t hex_offset = i * kHexCharsPerByte;
        const auto hi_char = hex_key[hex_offset];
        const auto lo_char = hex_key[hex_offset + 1U];

        auto hex_to_nibble = [](char c) -> uint8_t
        {
            if (c >= '0' && c <= '9') { return static_cast<uint8_t>(c - '0'); }
            if (c >= 'a' && c <= 'f') { return static_cast<uint8_t>(10U + (c - 'a')); }
            if (c >= 'A' && c <= 'F') { return static_cast<uint8_t>(10U + (c - 'A')); }
            return 0xFFU;
        };

        const uint8_t hi = hex_to_nibble(hi_char);
        const uint8_t lo = hex_to_nibble(lo_char);

        if (hi == 0xFFU || lo == 0xFFU)
        {
            ++stat_invalid_enr_;
            return;
        }

        // M012 — kHexNibbleBits could be named, but the 4/8 bit shifts here
        // are derived from hex encoding semantics (4 bits per nibble).
        static constexpr uint8_t kNibbleBits = 4U;
        node_id[i] = static_cast<uint8_t>((hi << kNibbleBits) | lo);
    }

    // Parse host:port from the part after '@'.
    const std::string_view host_port = body.substr(at_pos + 1U);
    const size_t colon_pos = host_port.rfind(':');
    if (colon_pos == std::string_view::npos)
    {
        ++stat_invalid_enr_;
        return;
    }

    const std::string host(host_port.substr(0U, colon_pos));
    const std::string port_str(host_port.substr(colon_pos + 1U));

    uint16_t port = 0U;
    for (const char c : port_str)
    {
        if (c < '0' || c > '9')
        {
            ++stat_invalid_enr_;
            return;
        }
        // Safe: port strings are always short; overflow cannot occur before the check.
        port = static_cast<uint16_t>(port * 10U + static_cast<uint16_t>(c - '0'));
    }

    if (port == 0U)
    {
        ++stat_invalid_enr_;
        return;
    }

    ValidatedPeer peer;
    peer.node_id  = node_id;
    peer.ip       = host;
    peer.udp_port = port;
    peer.tcp_port = port;
    peer.last_seen = std::chrono::steady_clock::now();

    process_found_peers({ peer });
}

// ---------------------------------------------------------------------------
// Private: emit_peer
// ---------------------------------------------------------------------------

void discv5_crawler::emit_peer(const ValidatedPeer& peer) noexcept
{
    // Apply fork-id filter when configured.
    if (config_.required_fork_id.has_value())
    {
        if (!peer.eth_fork_id.has_value())
        {
            ++stat_no_eth_entry_;
            return;
        }
        const ForkId& required = config_.required_fork_id.value();
        const ForkId& actual   = peer.eth_fork_id.value();
        if (required.hash != actual.hash || required.next != actual.next)
        {
            ++stat_wrong_chain_;
            return;
        }
    }

    // Dedup before emission (also checked in process_found_peers, but re-check here
    // for thread safety if emit_peer is ever called from outside process_found_peers).
    {
        std::lock_guard<std::mutex> lock(state_mutex_);
        const std::string key = node_key(peer.node_id);
        if (!discovered_ids_.insert(key).second)
        {
            ++stat_duplicates_;
            return;
        }
    }

    ++stat_discovered_;

    if (peer_callback_)
    {
        peer_callback_(peer);
    }
}

// ---------------------------------------------------------------------------
// Private: node_key
// ---------------------------------------------------------------------------

std::string discv5_crawler::node_key(const NodeId& id) noexcept
{
    // Use the raw bytes as a string key — no hex conversion needed for map keys.
    return std::string(reinterpret_cast<const char*>(id.data()), id.size());
}

} // namespace discv5

Updated on 2026-04-13 at 23:22:46 -0700