Skip to content

eth/eth_watch_service.hpp

Namespaces

Name
eth

Classes

Name
struct eth::WatchEventContext
Context metadata attached to a filtered watch event.
struct eth::WatchEventNotification
Enriched event payload emitted by EthWatchRunner and EthWatchService.
struct eth::EthWatchConnectionConfig
Connection pool limits for eth watch peer sessions. Defaults keep three active dial/watch slots per chain.
struct eth::EthWatchEventSpec
Event filter registration consumed by the production watch runtime.
struct eth::EthWatchServiceConfig
Production eth-watch orchestration config.
struct eth::EthWatchRuntimeStatsSnapshot
struct eth::WatchStatsSnapshot
Snapshot of live EthWatchService traffic counters.
class eth::EthWatchService
Ties together EventWatcher, ABI decoding, and eth message dispatch.

Types

Name
enum class EthWatchDiscoveryMode { kCacheOnly, kDiscoverIfNeeded, kDiscoverFirst, kHybrid}
Peer source strategy for production eth-watch startup.
using WatchId EventWatchId
Subscription handle returned by EthWatchService::watch_event().
using std::function< void(const WatchEventNotification &)> WatchEventNotificationCallback
Callback invoked for each decoded filtered event with chain/session metadata.
using std::function< void( const MatchedEvent &, const std::vector< abi::AbiValue > &)> DecodedEventCallback
Typed callback for a decoded event log.
using std::function< void(uint8_t eth_msg_id, std::vector< uint8_t > payload)> SendCallback
Callback used by EthWatchService to send an outgoing eth message.

Types Documentation

enum EthWatchDiscoveryMode

Enumerator Value Description
kCacheOnly Use cached nodes only; never start discovery from bootnodes.
kDiscoverIfNeeded Use cached nodes; start discovery only when no cached nodes are available.
kDiscoverFirst Start from discovery bootnodes and do not enqueue cached nodes initially.
kHybrid Enqueue cached nodes and also start discovery from bootnodes.

Peer source strategy for production eth-watch startup.

using EventWatchId

using eth::EventWatchId = WatchId;

Subscription handle returned by EthWatchService::watch_event().

using WatchEventNotificationCallback

using eth::WatchEventNotificationCallback = std::function<void(const WatchEventNotification&)>;

Callback invoked for each decoded filtered event with chain/session metadata.

using DecodedEventCallback

using eth::DecodedEventCallback = std::function<void(
   const MatchedEvent&,
   const std::vector<abi::AbiValue>&)>;

Typed callback for a decoded event log.

using SendCallback

using eth::SendCallback = std::function<void(uint8_t eth_msg_id, std::vector<uint8_t> payload)>;

Callback used by EthWatchService to send an outgoing eth message.

Parameters:

  • eth_msg_id Eth-layer message id (before adding the rlpx offset).
  • payload Encoded message bytes.

Source code

// Copyright 2026 Genius Ventures, Inc.
// SPDX-License-Identifier: MIT

#ifndef EVMRELAY_INCLUDE_ETH_ETH_WATCH_SERVICE_HPP
#define EVMRELAY_INCLUDE_ETH_ETH_WATCH_SERVICE_HPP

#include <eth/abi_decoder.hpp>
#include <eth/chain_tracker.hpp>
#include <eth/eth_peer_queue.hpp>
#include <eth/event_filter.hpp>
#include <eth/messages.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <discv4/discv4_client.hpp>
#include <discv5/discv5_client.hpp>
#include <discv5/enr_tree.hpp>
#include <array>
#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>

namespace eth {

class EthWatchRunner;

using EventWatchId = WatchId;

struct WatchEventContext
{
    std::string chain_name;
    uint64_t    network_id = 0;
    std::string peer_client_id;
    std::string peer_address;
};

struct WatchEventNotification
{
    WatchEventContext               context;
    MatchedEvent                    event;
    std::vector<abi::AbiValue>      values;
    std::string                     event_signature;
};

using WatchEventNotificationCallback = std::function<void(const WatchEventNotification&)>;

struct EthWatchConnectionConfig
{
    int max_total_connections = 24;
    int max_connections_per_chain = 3;
};

enum class EthWatchDiscoveryMode
{
    kCacheOnly,

    kDiscoverIfNeeded,

    kDiscoverFirst,

    kHybrid
};

using DecodedEventCallback = std::function<void(
    const MatchedEvent&,
    const std::vector<abi::AbiValue>&)>;

struct EthWatchEventSpec
{
    codec::Address             contract_address{};
    std::string                event_signature;
    std::vector<abi::AbiParam> params;
    std::optional<uint64_t>    from_block;
    std::optional<uint64_t>    to_block;
};

struct EthWatchServiceConfig
{
    EthWatchConnectionConfig               connection{};
    EthPeerQueueConfig                     peer_queue{};
    std::vector<discv4::ChainPeerConfig>   chains;
    std::vector<EthWatchEventSpec>         watches;
    EthWatchDiscoveryMode                  discovery_mode = EthWatchDiscoveryMode::kDiscoverIfNeeded;
    bool                                   enable_discv4_fallback = true;
    bool                                   enable_enr_tree_discovery = true;
    bool                                   attach_peer_dialer = true;
    discv4::discv4Config                  discovery{};
    discv5::discv5Config                  discv5_discovery{};

    std::function<discv4::DialFn(const discv4::ChainPeerConfig&)> dial_fn_factory{};

    std::function<std::shared_ptr<discv4::discv4_client>(
        boost::asio::io_context&,
        const discv4::discv4Config&)> discovery_client_factory{};

    std::function<std::shared_ptr<discv5::discv5_client>(
        boost::asio::io_context&,
        const discv5::discv5Config&)> discv5_client_factory{};

    std::function<std::vector<std::string>(
        const discv4::ChainPeerConfig&,
        const std::vector<std::string>&)> enr_tree_resolver{};

    std::function<bool(
        boost::asio::io_context&,
        const discv4::ChainPeerConfig&,
        std::shared_ptr<EthPeerQueue>)> discv4_fallback_starter{};

    std::function<bool(
        boost::asio::io_context&,
        const discv4::ChainPeerConfig&,
        std::shared_ptr<EthPeerQueue>,
        const std::vector<std::string>&)> discv5_enr_tree_starter{};
};

using SendCallback = std::function<void(uint8_t eth_msg_id, std::vector<uint8_t> payload)>;

struct EthWatchRuntimeStatsSnapshot
{
    uint64_t tcp_connect_failures = 0;
    uint64_t tcp_connected = 0;
    uint64_t auth_success = 0;
    uint64_t local_hello_sent = 0;
    uint64_t peer_disconnect_before_hello = 0;
    uint64_t peer_hello_accepted = 0;
    uint64_t eth_status_sent = 0;
    uint64_t remote_status_accepted = 0;
    uint64_t remote_status_rejected = 0;
    std::array<uint64_t, 256> remote_status_rejected_disconnect_reasons{};
    std::array<uint64_t, 4> remote_status_rejected_validation_errors{};
    EthPeerQueueStatsSnapshot peer_queue{};
};

struct WatchStatsSnapshot
{
    uint64_t eth_messages_seen = 0;
    uint64_t new_block_hashes_messages = 0;
    uint64_t new_block_messages = 0;
    uint64_t receipts_messages = 0;
    uint64_t decode_failures = 0;
    uint64_t receipts_requested = 0;
    uint64_t receipts_processed = 0;
    uint64_t logs_seen = 0;
    uint64_t matched_logs = 0;
    uint64_t discarded_logs = 0;
};

class EthWatchService
{
public:
    EthWatchService() = default;
    ~EthWatchService();

    EthWatchService(const EthWatchService&) = delete;
    EthWatchService& operator=(const EthWatchService&) = delete;
    EthWatchService(EthWatchService&&) = default;
    EthWatchService& operator=(EthWatchService&&) = default;

    [[nodiscard]] bool initialize(
        EthWatchServiceConfig          config,
        WatchEventNotificationCallback callback) noexcept;

    void run(boost::asio::io_context& io) noexcept;

    void stop() noexcept;

    [[nodiscard]] bool initialized() const noexcept;
    [[nodiscard]] size_t runtime_chain_count() const noexcept;
    [[nodiscard]] size_t scheduler_count() const noexcept;
    [[nodiscard]] size_t peer_queue_count() const noexcept;
    [[nodiscard]] size_t discovery_client_count() const noexcept;
    [[nodiscard]] size_t discv4_fallback_count() const noexcept;
    [[nodiscard]] size_t active_runner_count() const noexcept;
    [[nodiscard]] WatchStatsSnapshot aggregate_runtime_stats() const noexcept;
    [[nodiscard]] EthWatchRuntimeStatsSnapshot aggregate_connection_stats() const noexcept;
    [[nodiscard]] std::shared_ptr<EthPeerQueue> peer_queue(const std::string& chain_name) const noexcept;

    void set_send_callback(SendCallback cb) noexcept;

    void set_eth_message_schemas(std::vector<EthMessageSchema> schemas) noexcept;

    EventWatchId watch_event(
        const codec::Address&             contract_address,
        const std::string&                event_signature,
        const std::vector<abi::AbiParam>& params,
        DecodedEventCallback              callback,
        std::optional<uint64_t>           from_block = std::nullopt,
        std::optional<uint64_t>           to_block   = std::nullopt) noexcept;

    [[nodiscard]] uint64_t tip() const noexcept
    {
        return chain_tracker_.tip();
    }

    [[nodiscard]] std::optional<Hash256> tip_hash() const noexcept
    {
        return chain_tracker_.tip_hash();
    }

    void unwatch(EventWatchId id) noexcept;

    [[nodiscard]] size_t subscription_count() const noexcept
    {
        return watcher_.subscription_count();
    }

    [[nodiscard]] WatchStatsSnapshot stats() const noexcept
    {
        return stats_;
    }

    void process_message(uint8_t eth_msg_id, rlp::ByteView payload) noexcept;

    void process_receipts(
        const std::vector<codec::Receipt>& receipts,
        const std::vector<Hash256>&        tx_hashes,
        uint64_t                           block_number,
        const Hash256&                     block_hash,
        const std::vector<std::vector<uint32_t>>& log_indices = {}) noexcept;

    void process_new_block(const NewBlockMessage& msg,
                           const Hash256&         block_hash) noexcept;

    void request_receipts(const Hash256& block_hash, uint64_t block_number) noexcept;

private:
    struct PendingRequest
    {
        Hash256  block_hash{};
        uint64_t block_number = 0;
    };

    struct Subscription
    {
        EventWatchId               id;
        std::string                event_signature;
        std::vector<abi::AbiParam> params;
        DecodedEventCallback       callback;
    };

    SendCallback              send_cb_;
    ChainTracker              chain_tracker_;
    EventWatcher              watcher_;
    std::vector<Subscription> subscriptions_;
    EventWatchId              next_id_     = 1;
    uint64_t                  next_req_id_ = 1;
    WatchStatsSnapshot        stats_{};
    std::vector<EthMessageSchema> eth_message_schemas_;

    std::map<uint64_t, PendingRequest> pending_requests_;

    struct RuntimeChain
    {
        discv4::ChainPeerConfig                 config;
        std::shared_ptr<discv4::DialScheduler>  scheduler;
        std::shared_ptr<EthPeerQueue>           peer_queue;
        std::shared_ptr<discv4::discv4_client>  discovery_client;
        std::shared_ptr<discv5::discv5_client>  discv5_client;
        bool                                    discv4_fallback_started = false;
        bool                                    discv5_enr_tree_started = false;
        bool                                    discv5_cache_enr_started = false;
        std::shared_ptr<EthWatchRuntimeStatsSnapshot> stats;
    };

    [[nodiscard]] discv4::DialFn make_default_dial_fn(
        const discv4::ChainPeerConfig& chain_config) noexcept;
    void start_discv4_fallback(
        boost::asio::io_context& io,
        RuntimeChain&            runtime) noexcept;
    bool start_enr_tree_discovery(
        boost::asio::io_context& io,
        RuntimeChain&            runtime) noexcept;
    bool start_discv5_discovery(
        boost::asio::io_context&       io,
        RuntimeChain&                  runtime,
        const std::vector<std::string>& bootstrap_enrs) noexcept;

    bool                           orchestration_initialized_ = false;
    bool                           orchestration_running_ = false;
    EthWatchServiceConfig          orchestration_config_{};
    WatchEventNotificationCallback orchestration_callback_{};
    std::vector<RuntimeChain>      runtime_chains_{};
    std::vector<std::shared_ptr<EthWatchRunner>> active_runners_{};
    std::unordered_map<std::string, std::shared_ptr<EthWatchRuntimeStatsSnapshot>> runtime_stats_by_chain_;
};

} // namespace eth

#endif // EVMRELAY_INCLUDE_ETH_ETH_WATCH_SERVICE_HPP

Updated on 2026-06-05 at 17:22:19 -0700