eth/eth_watch_service.cpp¶
Namespaces¶
| Name |
|---|
| eth |
Source code¶
// Copyright 2025 GeniusVentures
// SPDX-License-Identifier: Apache-2.0
#include <eth/eth_watch_service.hpp>
#include <eth/messages.hpp>
namespace eth {
// ---------------------------------------------------------------------------
// set_send_callback
// ---------------------------------------------------------------------------
void EthWatchService::set_send_callback(SendCallback cb) noexcept
{
send_cb_ = std::move(cb);
}
// ---------------------------------------------------------------------------
// watch_event
// ---------------------------------------------------------------------------
EventWatchId EthWatchService::watch_event(
const codec::Address& contract_address,
const std::string& event_signature,
const std::vector<abi::AbiParam>& params,
DecodedEventCallback callback,
std::optional<uint64_t> from_block,
std::optional<uint64_t> to_block) noexcept
{
const EventWatchId id = next_id_++;
EventFilter filter;
const codec::Address zero_addr{};
if (contract_address != zero_addr)
{
filter.addresses.push_back(contract_address);
}
filter.topics.push_back(abi::event_signature_hash(event_signature));
filter.from_block = from_block;
filter.to_block = to_block;
subscriptions_.push_back({id, event_signature, params, std::move(callback)});
watcher_.watch(filter, [this, id](const MatchedEvent& ev)
{
auto it = std::find_if(subscriptions_.begin(), subscriptions_.end(),
[id](const Subscription& s) { return s.id == id; });
if (it == subscriptions_.end())
{
return;
}
auto decoded = abi::decode_log(ev.log, it->event_signature, it->params);
if (!decoded)
{
return;
}
it->callback(ev, decoded.value());
});
return id;
}
// ---------------------------------------------------------------------------
// unwatch
// ---------------------------------------------------------------------------
void EthWatchService::unwatch(EventWatchId id) noexcept
{
subscriptions_.erase(
std::remove_if(subscriptions_.begin(), subscriptions_.end(),
[id](const Subscription& s) { return s.id == id; }),
subscriptions_.end());
watcher_.unwatch(id);
}
// ---------------------------------------------------------------------------
// request_receipts
// ---------------------------------------------------------------------------
void EthWatchService::request_receipts(const Hash256& block_hash,
uint64_t block_number) noexcept
{
if (!send_cb_)
{
return;
}
// Deduplicate — skip if we have already requested receipts for this block
if (!chain_tracker_.mark_seen(block_hash, block_number))
{
return;
}
const uint64_t req_id = next_req_id_++;
GetReceiptsMessage req;
req.request_id = req_id;
req.block_hashes.push_back(block_hash);
auto encoded = protocol::encode_get_receipts(req);
if (!encoded)
{
return;
}
pending_requests_[req_id] = {block_hash, block_number};
send_cb_(protocol::kGetReceiptsMessageId, std::move(encoded.value()));
}
// ---------------------------------------------------------------------------
// process_message
// ---------------------------------------------------------------------------
void EthWatchService::process_message(uint8_t eth_msg_id, rlp::ByteView payload) noexcept
{
if (eth_msg_id == protocol::kNewBlockHashesMessageId)
{
auto decoded = protocol::decode_new_block_hashes(payload);
if (!decoded)
{
return;
}
for (const auto& entry : decoded.value().entries)
{
request_receipts(entry.hash, entry.number);
}
return;
}
if (eth_msg_id == protocol::kNewBlockMessageId)
{
auto decoded = protocol::decode_new_block(payload);
if (!decoded)
{
return;
}
// NewBlock does not include a block hash on the wire — use zeroed sentinel.
// Still trigger request_receipts so callers with a send_cb get receipts.
const Hash256 block_hash{};
process_new_block(decoded.value(), block_hash);
return;
}
if (eth_msg_id == protocol::kReceiptsMessageId)
{
auto decoded = protocol::decode_receipts(payload);
if (!decoded)
{
return;
}
const auto& msg = decoded.value();
size_t block_idx = 0;
for (const auto& block_receipts : msg.receipts)
{
uint64_t block_number = 0;
Hash256 block_hash{};
// Correlate to a pending request if request_id is present
if (msg.request_id.has_value())
{
// Each block in the response corresponds to one hash in the request.
// We issued one hash per request, so request_id maps 1:1.
if (block_idx == 0)
{
auto it = pending_requests_.find(msg.request_id.value());
if (it != pending_requests_.end())
{
block_hash = it->second.block_hash;
block_number = it->second.block_number;
pending_requests_.erase(it);
}
}
}
std::vector<Hash256> tx_hashes(block_receipts.size());
process_receipts(block_receipts, tx_hashes, block_number, block_hash);
++block_idx;
}
return;
}
}
// ---------------------------------------------------------------------------
// process_receipts
// ---------------------------------------------------------------------------
void EthWatchService::process_receipts(
const std::vector<codec::Receipt>& receipts,
const std::vector<Hash256>& tx_hashes,
uint64_t block_number,
const Hash256& block_hash) noexcept
{
const size_t count = std::min(receipts.size(), tx_hashes.size());
for (size_t i = 0; i < count; ++i)
{
watcher_.process_receipt(receipts[i], tx_hashes[i], block_number, block_hash);
}
}
// ---------------------------------------------------------------------------
// process_new_block
// ---------------------------------------------------------------------------
void EthWatchService::process_new_block(const NewBlockMessage& msg,
const Hash256& block_hash) noexcept
{
// Request receipts for each transaction in the block so logs can be watched.
// The block number comes from the embedded header.
request_receipts(block_hash, msg.header.number);
}
} // namespace eth
Updated on 2026-04-13 at 23:22:46 -0700