eth/rpc_receipt_source.cpp¶
Namespaces¶
| Name |
|---|
| eth |
| eth::rpc |
Source code¶
// Copyright 2026 Genius Ventures, Inc.
// SPDX-License-Identifier: MIT
#include <eth/rpc_receipt_source.hpp>
#include <algorithm>
namespace eth::rpc {
RpcReceiptSource::RpcReceiptSource(
JsonRpcTransport& transport,
FinalityPolicy finality_policy,
uint64_t last_processed_block,
uint64_t max_log_range)
: transport_(transport)
, finality_policy_(finality_policy)
, last_processed_block_(last_processed_block)
, max_log_range_(std::max<uint64_t>(1, max_log_range))
{
}
WatchId RpcReceiptSource::add_filter(EventFilter filter)
{
const WatchId id = next_watch_id_++;
filters_.push_back({id, std::move(filter)});
return id;
}
void RpcReceiptSource::remove_filter(WatchId id)
{
filters_.erase(
std::remove_if(
filters_.begin(),
filters_.end(),
[id](const RegisteredFilter& entry)
{
return entry.id == id;
}),
filters_.end());
}
void RpcReceiptSource::set_receipt_batch_handler(ReceiptBatchHandler handler)
{
handler_ = std::move(handler);
}
std::optional<ReceiptResult> RpcReceiptSource::get_receipt(const Hash256& tx_hash)
{
const auto response = transport_.call(
make_get_transaction_receipt_request(tx_hash, next_request_id()));
if (!response.has_value())
{
return std::nullopt;
}
return parse_transaction_receipt_response(*response);
}
std::optional<FinalityDecision> RpcReceiptSource::finality_head()
{
ChainHeadSnapshot heads;
if (finality_policy_.prefer_finalized)
{
heads.finalized_number = get_block_number(RpcBlockTag::kFinalized);
}
if (finality_policy_.prefer_safe)
{
heads.safe_number = get_block_number(RpcBlockTag::kSafe);
}
heads.latest_number = get_block_number(RpcBlockTag::kLatest);
const auto decision = choose_finality_head(finality_policy_, heads);
if (!decision)
{
return std::nullopt;
}
return decision;
}
bool RpcReceiptSource::poll_once()
{
const auto head = finality_head();
if (!head.has_value())
{
return false;
}
if (head->block_number <= last_processed_block_)
{
return true;
}
return backfill(last_processed_block_ + 1, head->block_number);
}
bool RpcReceiptSource::backfill(uint64_t from_block, uint64_t to_block)
{
if (from_block > to_block)
{
return true;
}
for (uint64_t range_start = from_block; range_start <= to_block;)
{
const uint64_t range_end = range_start + std::min(
max_log_range_ - 1,
to_block - range_start);
std::set<Hash256> seen_transactions;
for (const auto& entry : filters_)
{
if (!backfill_filter(entry.filter, range_start, range_end, seen_transactions))
{
return false;
}
}
last_processed_block_ = range_end;
if (range_end == to_block)
{
break;
}
range_start = range_end + 1;
}
return true;
}
uint64_t RpcReceiptSource::next_request_id() noexcept
{
return next_rpc_id_++;
}
std::optional<uint64_t> RpcReceiptSource::get_block_number(RpcBlockTag tag)
{
const auto response = transport_.call(
make_get_block_by_number_request(tag, next_request_id()));
if (!response.has_value())
{
return std::nullopt;
}
return parse_block_number_response(*response);
}
bool RpcReceiptSource::backfill_filter(
const EventFilter& filter,
uint64_t from_block,
uint64_t to_block,
std::set<Hash256>& seen_transactions)
{
const auto response = transport_.call(
make_get_logs_request(filter, from_block, to_block, next_request_id()));
if (!response.has_value())
{
return false;
}
const auto logs = parse_get_logs_response(*response);
if (!logs.has_value())
{
return false;
}
for (const auto& log : *logs)
{
if (!seen_transactions.insert(log.tx_hash).second)
{
continue;
}
if (!emit_receipt(log.tx_hash))
{
return false;
}
}
return true;
}
bool RpcReceiptSource::emit_receipt(const Hash256& tx_hash)
{
const auto receipt = get_receipt(tx_hash);
if (!receipt.has_value())
{
return false;
}
if (handler_)
{
ReceiptBatch batch;
batch.receipts.push_back(receipt->receipt);
batch.tx_hashes.push_back(receipt->tx_hash);
batch.log_indices.push_back(receipt->log_indices);
batch.block_number = receipt->block_number;
batch.block_hash = receipt->block_hash;
handler_(batch);
}
return true;
}
} // namespace eth::rpc
Updated on 2026-06-05 at 17:22:19 -0700