eth/eth_watch_service.hpp¶
Namespaces¶
| Name |
|---|
| eth |
Classes¶
| Name | |
|---|---|
| struct | eth::WatchEventContext Context metadata attached to a filtered watch event. |
| struct | eth::WatchEventNotification Enriched event payload emitted by EthWatchRunner and EthWatchService. |
| struct | eth::EthWatchConnectionConfig Connection pool limits for eth watch peer sessions. Defaults keep three active dial/watch slots per chain. |
| struct | eth::EthWatchEventSpec Event filter registration consumed by the production watch runtime. |
| struct | eth::EthWatchServiceConfig Production eth-watch orchestration config. |
| struct | eth::EthWatchRuntimeStatsSnapshot |
| struct | eth::WatchStatsSnapshot Snapshot of live EthWatchService traffic counters. |
| class | eth::EthWatchService Ties together EventWatcher, ABI decoding, and eth message dispatch. |
Types¶
| Name | |
|---|---|
| enum class | EthWatchDiscoveryMode { kCacheOnly, kDiscoverIfNeeded, kDiscoverFirst, kHybrid} Peer source strategy for production eth-watch startup. |
| using WatchId | EventWatchId Subscription handle returned by EthWatchService::watch_event(). |
| using std::function< void(const WatchEventNotification &)> | WatchEventNotificationCallback Callback invoked for each decoded filtered event with chain/session metadata. |
| using std::function< void( const MatchedEvent &, const std::vector< abi::AbiValue > &)> | DecodedEventCallback Typed callback for a decoded event log. |
| using std::function< void(uint8_t eth_msg_id, std::vector< uint8_t > payload)> | SendCallback Callback used by EthWatchService to send an outgoing eth message. |
Types Documentation¶
enum EthWatchDiscoveryMode¶
| Enumerator | Value | Description |
|---|---|---|
| kCacheOnly | Use cached nodes only; never start discovery from bootnodes. |
|
| kDiscoverIfNeeded | Use cached nodes; start discovery only when no cached nodes are available. |
|
| kDiscoverFirst | Start from discovery bootnodes and do not enqueue cached nodes initially. |
|
| kHybrid | Enqueue cached nodes and also start discovery from bootnodes. |
Peer source strategy for production eth-watch startup.
using EventWatchId¶
Subscription handle returned by EthWatchService::watch_event().
using WatchEventNotificationCallback¶
Callback invoked for each decoded filtered event with chain/session metadata.
using DecodedEventCallback¶
using eth::DecodedEventCallback = std::function<void(
const MatchedEvent&,
const std::vector<abi::AbiValue>&)>;
Typed callback for a decoded event log.
using SendCallback¶
Callback used by EthWatchService to send an outgoing eth message.
Parameters:
- eth_msg_id Eth-layer message id (before adding the rlpx offset).
- payload Encoded message bytes.
Source code¶
// Copyright 2026 Genius Ventures, Inc.
// SPDX-License-Identifier: MIT
#ifndef EVMRELAY_INCLUDE_ETH_ETH_WATCH_SERVICE_HPP
#define EVMRELAY_INCLUDE_ETH_ETH_WATCH_SERVICE_HPP
#include <eth/abi_decoder.hpp>
#include <eth/chain_tracker.hpp>
#include <eth/eth_peer_queue.hpp>
#include <eth/event_filter.hpp>
#include <eth/messages.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <discv4/discv4_client.hpp>
#include <discv5/discv5_client.hpp>
#include <discv5/enr_tree.hpp>
#include <array>
#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <unordered_map>
#include <vector>
namespace eth {
class EthWatchRunner;
using EventWatchId = WatchId;
struct WatchEventContext
{
std::string chain_name;
uint64_t network_id = 0;
std::string peer_client_id;
std::string peer_address;
};
struct WatchEventNotification
{
WatchEventContext context;
MatchedEvent event;
std::vector<abi::AbiValue> values;
std::string event_signature;
};
using WatchEventNotificationCallback = std::function<void(const WatchEventNotification&)>;
struct EthWatchConnectionConfig
{
int max_total_connections = 24;
int max_connections_per_chain = 3;
};
enum class EthWatchDiscoveryMode
{
kCacheOnly,
kDiscoverIfNeeded,
kDiscoverFirst,
kHybrid
};
using DecodedEventCallback = std::function<void(
const MatchedEvent&,
const std::vector<abi::AbiValue>&)>;
struct EthWatchEventSpec
{
codec::Address contract_address{};
std::string event_signature;
std::vector<abi::AbiParam> params;
std::optional<uint64_t> from_block;
std::optional<uint64_t> to_block;
};
struct EthWatchServiceConfig
{
EthWatchConnectionConfig connection{};
EthPeerQueueConfig peer_queue{};
std::vector<discv4::ChainPeerConfig> chains;
std::vector<EthWatchEventSpec> watches;
EthWatchDiscoveryMode discovery_mode = EthWatchDiscoveryMode::kDiscoverIfNeeded;
bool enable_discv4_fallback = true;
bool enable_enr_tree_discovery = true;
bool attach_peer_dialer = true;
discv4::discv4Config discovery{};
discv5::discv5Config discv5_discovery{};
std::function<discv4::DialFn(const discv4::ChainPeerConfig&)> dial_fn_factory{};
std::function<std::shared_ptr<discv4::discv4_client>(
boost::asio::io_context&,
const discv4::discv4Config&)> discovery_client_factory{};
std::function<std::shared_ptr<discv5::discv5_client>(
boost::asio::io_context&,
const discv5::discv5Config&)> discv5_client_factory{};
std::function<std::vector<std::string>(
const discv4::ChainPeerConfig&,
const std::vector<std::string>&)> enr_tree_resolver{};
std::function<bool(
boost::asio::io_context&,
const discv4::ChainPeerConfig&,
std::shared_ptr<EthPeerQueue>)> discv4_fallback_starter{};
std::function<bool(
boost::asio::io_context&,
const discv4::ChainPeerConfig&,
std::shared_ptr<EthPeerQueue>,
const std::vector<std::string>&)> discv5_enr_tree_starter{};
};
using SendCallback = std::function<void(uint8_t eth_msg_id, std::vector<uint8_t> payload)>;
struct EthWatchRuntimeStatsSnapshot
{
uint64_t tcp_connect_failures = 0;
uint64_t tcp_connected = 0;
uint64_t auth_success = 0;
uint64_t local_hello_sent = 0;
uint64_t peer_disconnect_before_hello = 0;
uint64_t peer_hello_accepted = 0;
uint64_t eth_status_sent = 0;
uint64_t remote_status_accepted = 0;
uint64_t remote_status_rejected = 0;
std::array<uint64_t, 256> remote_status_rejected_disconnect_reasons{};
std::array<uint64_t, 4> remote_status_rejected_validation_errors{};
EthPeerQueueStatsSnapshot peer_queue{};
};
struct WatchStatsSnapshot
{
uint64_t eth_messages_seen = 0;
uint64_t new_block_hashes_messages = 0;
uint64_t new_block_messages = 0;
uint64_t receipts_messages = 0;
uint64_t decode_failures = 0;
uint64_t receipts_requested = 0;
uint64_t receipts_processed = 0;
uint64_t logs_seen = 0;
uint64_t matched_logs = 0;
uint64_t discarded_logs = 0;
};
class EthWatchService
{
public:
EthWatchService() = default;
~EthWatchService();
EthWatchService(const EthWatchService&) = delete;
EthWatchService& operator=(const EthWatchService&) = delete;
EthWatchService(EthWatchService&&) = default;
EthWatchService& operator=(EthWatchService&&) = default;
[[nodiscard]] bool initialize(
EthWatchServiceConfig config,
WatchEventNotificationCallback callback) noexcept;
void run(boost::asio::io_context& io) noexcept;
void stop() noexcept;
[[nodiscard]] bool initialized() const noexcept;
[[nodiscard]] size_t runtime_chain_count() const noexcept;
[[nodiscard]] size_t scheduler_count() const noexcept;
[[nodiscard]] size_t peer_queue_count() const noexcept;
[[nodiscard]] size_t discovery_client_count() const noexcept;
[[nodiscard]] size_t discv4_fallback_count() const noexcept;
[[nodiscard]] size_t active_runner_count() const noexcept;
[[nodiscard]] WatchStatsSnapshot aggregate_runtime_stats() const noexcept;
[[nodiscard]] EthWatchRuntimeStatsSnapshot aggregate_connection_stats() const noexcept;
[[nodiscard]] std::shared_ptr<EthPeerQueue> peer_queue(const std::string& chain_name) const noexcept;
void set_send_callback(SendCallback cb) noexcept;
void set_eth_message_schemas(std::vector<EthMessageSchema> schemas) noexcept;
EventWatchId watch_event(
const codec::Address& contract_address,
const std::string& event_signature,
const std::vector<abi::AbiParam>& params,
DecodedEventCallback callback,
std::optional<uint64_t> from_block = std::nullopt,
std::optional<uint64_t> to_block = std::nullopt) noexcept;
[[nodiscard]] uint64_t tip() const noexcept
{
return chain_tracker_.tip();
}
[[nodiscard]] std::optional<Hash256> tip_hash() const noexcept
{
return chain_tracker_.tip_hash();
}
void unwatch(EventWatchId id) noexcept;
[[nodiscard]] size_t subscription_count() const noexcept
{
return watcher_.subscription_count();
}
[[nodiscard]] WatchStatsSnapshot stats() const noexcept
{
return stats_;
}
void process_message(uint8_t eth_msg_id, rlp::ByteView payload) noexcept;
void process_receipts(
const std::vector<codec::Receipt>& receipts,
const std::vector<Hash256>& tx_hashes,
uint64_t block_number,
const Hash256& block_hash,
const std::vector<std::vector<uint32_t>>& log_indices = {}) noexcept;
void process_new_block(const NewBlockMessage& msg,
const Hash256& block_hash) noexcept;
void request_receipts(const Hash256& block_hash, uint64_t block_number) noexcept;
private:
struct PendingRequest
{
Hash256 block_hash{};
uint64_t block_number = 0;
};
struct Subscription
{
EventWatchId id;
std::string event_signature;
std::vector<abi::AbiParam> params;
DecodedEventCallback callback;
};
SendCallback send_cb_;
ChainTracker chain_tracker_;
EventWatcher watcher_;
std::vector<Subscription> subscriptions_;
EventWatchId next_id_ = 1;
uint64_t next_req_id_ = 1;
WatchStatsSnapshot stats_{};
std::vector<EthMessageSchema> eth_message_schemas_;
std::map<uint64_t, PendingRequest> pending_requests_;
struct RuntimeChain
{
discv4::ChainPeerConfig config;
std::shared_ptr<discv4::DialScheduler> scheduler;
std::shared_ptr<EthPeerQueue> peer_queue;
std::shared_ptr<discv4::discv4_client> discovery_client;
std::shared_ptr<discv5::discv5_client> discv5_client;
bool discv4_fallback_started = false;
bool discv5_enr_tree_started = false;
bool discv5_cache_enr_started = false;
std::shared_ptr<EthWatchRuntimeStatsSnapshot> stats;
};
[[nodiscard]] discv4::DialFn make_default_dial_fn(
const discv4::ChainPeerConfig& chain_config) noexcept;
void start_discv4_fallback(
boost::asio::io_context& io,
RuntimeChain& runtime) noexcept;
bool start_enr_tree_discovery(
boost::asio::io_context& io,
RuntimeChain& runtime) noexcept;
bool start_discv5_discovery(
boost::asio::io_context& io,
RuntimeChain& runtime,
const std::vector<std::string>& bootstrap_enrs) noexcept;
bool orchestration_initialized_ = false;
bool orchestration_running_ = false;
EthWatchServiceConfig orchestration_config_{};
WatchEventNotificationCallback orchestration_callback_{};
std::vector<RuntimeChain> runtime_chains_{};
std::vector<std::shared_ptr<EthWatchRunner>> active_runners_{};
std::unordered_map<std::string, std::shared_ptr<EthWatchRuntimeStatsSnapshot>> runtime_stats_by_chain_;
};
} // namespace eth
#endif // EVMRELAY_INCLUDE_ETH_ETH_WATCH_SERVICE_HPP
Updated on 2026-06-05 at 17:22:19 -0700