Skip to content

eth/eth_watch_service.cpp

Namespaces

Name
eth

Source code

// Copyright 2025 GeniusVentures
// SPDX-License-Identifier: Apache-2.0

#include <eth/eth_watch_service.hpp>
#include <eth/messages.hpp>

namespace eth {

// ---------------------------------------------------------------------------
// set_send_callback
// ---------------------------------------------------------------------------

void EthWatchService::set_send_callback(SendCallback cb) noexcept
{
    send_cb_ = std::move(cb);
}

// ---------------------------------------------------------------------------
// watch_event
// ---------------------------------------------------------------------------

EventWatchId EthWatchService::watch_event(
    const codec::Address&             contract_address,
    const std::string&                event_signature,
    const std::vector<abi::AbiParam>& params,
    DecodedEventCallback              callback,
    std::optional<uint64_t>           from_block,
    std::optional<uint64_t>           to_block) noexcept
{
    const EventWatchId id = next_id_++;

    EventFilter filter;

    const codec::Address zero_addr{};
    if (contract_address != zero_addr)
    {
        filter.addresses.push_back(contract_address);
    }

    filter.topics.push_back(abi::event_signature_hash(event_signature));
    filter.from_block = from_block;
    filter.to_block   = to_block;

    subscriptions_.push_back({id, event_signature, params, std::move(callback)});

    watcher_.watch(filter, [this, id](const MatchedEvent& ev)
    {
        auto it = std::find_if(subscriptions_.begin(), subscriptions_.end(),
            [id](const Subscription& s) { return s.id == id; });
        if (it == subscriptions_.end())
        {
            return;
        }
        auto decoded = abi::decode_log(ev.log, it->event_signature, it->params);
        if (!decoded)
        {
            return;
        }
        it->callback(ev, decoded.value());
    });

    return id;
}

// ---------------------------------------------------------------------------
// unwatch
// ---------------------------------------------------------------------------

void EthWatchService::unwatch(EventWatchId id) noexcept
{
    subscriptions_.erase(
        std::remove_if(subscriptions_.begin(), subscriptions_.end(),
            [id](const Subscription& s) { return s.id == id; }),
        subscriptions_.end());

    watcher_.unwatch(id);
}

// ---------------------------------------------------------------------------
// request_receipts
// ---------------------------------------------------------------------------

void EthWatchService::request_receipts(const Hash256& block_hash,
                                       uint64_t       block_number) noexcept
{
    if (!send_cb_)
    {
        return;
    }

    // Deduplicate — skip if we have already requested receipts for this block
    if (!chain_tracker_.mark_seen(block_hash, block_number))
    {
        return;
    }

    const uint64_t req_id = next_req_id_++;

    GetReceiptsMessage req;
    req.request_id = req_id;
    req.block_hashes.push_back(block_hash);

    auto encoded = protocol::encode_get_receipts(req);
    if (!encoded)
    {
        return;
    }

    pending_requests_[req_id] = {block_hash, block_number};
    send_cb_(protocol::kGetReceiptsMessageId, std::move(encoded.value()));
}

// ---------------------------------------------------------------------------
// process_message
// ---------------------------------------------------------------------------

void EthWatchService::process_message(uint8_t eth_msg_id, rlp::ByteView payload) noexcept
{
    if (eth_msg_id == protocol::kNewBlockHashesMessageId)
    {
        auto decoded = protocol::decode_new_block_hashes(payload);
        if (!decoded)
        {
            return;
        }
        for (const auto& entry : decoded.value().entries)
        {
            request_receipts(entry.hash, entry.number);
        }
        return;
    }

    if (eth_msg_id == protocol::kNewBlockMessageId)
    {
        auto decoded = protocol::decode_new_block(payload);
        if (!decoded)
        {
            return;
        }
        // NewBlock does not include a block hash on the wire — use zeroed sentinel.
        // Still trigger request_receipts so callers with a send_cb get receipts.
        const Hash256 block_hash{};
        process_new_block(decoded.value(), block_hash);
        return;
    }

    if (eth_msg_id == protocol::kReceiptsMessageId)
    {
        auto decoded = protocol::decode_receipts(payload);
        if (!decoded)
        {
            return;
        }

        const auto& msg = decoded.value();
        size_t block_idx = 0;
        for (const auto& block_receipts : msg.receipts)
        {
            uint64_t block_number = 0;
            Hash256  block_hash{};

            // Correlate to a pending request if request_id is present
            if (msg.request_id.has_value())
            {
                // Each block in the response corresponds to one hash in the request.
                // We issued one hash per request, so request_id maps 1:1.
                if (block_idx == 0)
                {
                    auto it = pending_requests_.find(msg.request_id.value());
                    if (it != pending_requests_.end())
                    {
                        block_hash   = it->second.block_hash;
                        block_number = it->second.block_number;
                        pending_requests_.erase(it);
                    }
                }
            }

            std::vector<Hash256> tx_hashes(block_receipts.size());
            process_receipts(block_receipts, tx_hashes, block_number, block_hash);
            ++block_idx;
        }
        return;
    }
}

// ---------------------------------------------------------------------------
// process_receipts
// ---------------------------------------------------------------------------

void EthWatchService::process_receipts(
    const std::vector<codec::Receipt>& receipts,
    const std::vector<Hash256>&        tx_hashes,
    uint64_t                           block_number,
    const Hash256&                     block_hash) noexcept
{
    const size_t count = std::min(receipts.size(), tx_hashes.size());
    for (size_t i = 0; i < count; ++i)
    {
        watcher_.process_receipt(receipts[i], tx_hashes[i], block_number, block_hash);
    }
}

// ---------------------------------------------------------------------------
// process_new_block
// ---------------------------------------------------------------------------

void EthWatchService::process_new_block(const NewBlockMessage& msg,
                                        const Hash256&         block_hash) noexcept
{
    // Request receipts for each transaction in the block so logs can be watched.
    // The block number comes from the embedded header.
    request_receipts(block_hash, msg.header.number);
}

} // namespace eth

Updated on 2026-04-13 at 23:22:46 -0700