Skip to content

framing/message_stream.cpp

Namespaces

Name
rlpx
rlpx::framing

Source code

// Copyright 2025 GeniusVentures
// SPDX-License-Identifier: Apache-2.0

#include <rlpx/framing/message_stream.hpp>
#include <rlp/rlp_encoder.hpp>
#include <rlp/rlp_decoder.hpp>
#include <snappy.h>
#include <cstring>

namespace rlpx::framing {

namespace asio = boost::asio;

MessageStream::MessageStream(
    std::unique_ptr<FrameCipher> cipher,
    socket::SocketTransport transport
) noexcept
    : cipher_(std::move(cipher))
    , transport_(std::move(transport)) {
    // Pre-allocate buffers to reduce allocations
    send_buffer_.reserve(4096);
    recv_buffer_.reserve(4096);
}

void MessageStream::close() noexcept
{
    (void)transport_.close();
}

VoidResult MessageStream::send_message(const MessageSendParams& params, asio::yield_context yield) noexcept {
    // go-ethereum wire format: RLP-encoded uint(message_id) || payload
    // When Snappy is enabled, ONLY the payload is compressed; the message ID stays uncompressed.
    // Mirrors go-ethereum p2p/rlpx/rlpx.go: Write() = rlp.AppendUint64(code) + snappy.Encode(data)

    // Encode message ID as RLP varint (uncompressed in all cases).
    rlp::RlpEncoder id_encoder;
    if (auto res = id_encoder.add(params.message_id); !res) {
        return SessionError::kInvalidMessage;
    }
    auto id_result = id_encoder.GetBytes();
    if (!id_result) return SessionError::kInvalidMessage;
    ByteBuffer id_bytes = detail::from_rlp_bytes(*id_result.value());

    // Build payload part (optionally Snappy-compressed).
    ByteBuffer payload_bytes;
    if (!params.payload.empty()) {
        if (compression_enabled_) {
            std::string compressed;
            snappy::Compress(
                reinterpret_cast<const char*>(params.payload.data()),
                params.payload.size(),
                &compressed
            );
            payload_bytes.assign(compressed.begin(), compressed.end());
        } else {
            payload_bytes.assign(params.payload.begin(), params.payload.end());
        }
    }

    // Combine: [RLP(msg_id)] || [payload (maybe compressed)]
    ByteBuffer message_data;
    message_data.reserve(id_bytes.size() + payload_bytes.size());
    message_data.insert(message_data.end(), id_bytes.begin(), id_bytes.end());
    message_data.insert(message_data.end(), payload_bytes.begin(), payload_bytes.end());
        // Frame the message (may need to split into multiple frames)
        // For now, send as single frame (max 16MB)
        if ( message_data.size() > kMaxFrameSize ) {
            return SessionError::kInvalidMessage;
        }

        FrameEncryptParams frame_params{};
        frame_params.frame_data = message_data;
        frame_params.is_first_frame = true;

        auto encrypted_frame_result = cipher_->encrypt_frame(frame_params);
        if ( !encrypted_frame_result ) {
            return SessionError::kEncryptionError;
        }

        // Send encrypted frame over socket
        auto write_result = transport_.write_all(encrypted_frame_result.value(), yield);
        if ( !write_result ) {
            return write_result.error();
        }

        return outcome::success();
}

Result<Message> MessageStream::receive_message(asio::yield_context yield) noexcept {
    // Receive encrypted frame from socket
    auto frame_result = receive_frame(yield);
    if ( !frame_result || frame_result.value().empty() ) {
        return SessionError::kInvalidMessage;
    }

        const auto& frame_data = frame_result.value();

        // go-ethereum wire format: RLP-encoded uint(code) || payload (maybe Snappy-compressed)
        // Mirrors go-ethereum p2p/rlpx/rlpx.go: Read() = rlp.SplitUint64 + snappy.Decode(rest)
        rlp::RlpDecoder id_decoder(detail::to_rlp_view(frame_data));

        uint64_t msg_id = 0;
        if (!id_decoder.read(msg_id)) {
            return SessionError::kInvalidMessage;
        }

        // Remaining bytes are the payload (Snappy-compressed if enabled, raw otherwise).
        ByteView remaining_view = id_decoder.Remaining();
        ByteBuffer payload;

        if (compression_enabled_ && !remaining_view.empty()) {
            size_t uncompressed_len = 0;
            if (!snappy::GetUncompressedLength(
                    reinterpret_cast<const char*>(remaining_view.data()),
                    remaining_view.size(),
                    &uncompressed_len)) {
                return SessionError::kInvalidMessage;
            }
            payload.resize(uncompressed_len);
            if (!snappy::RawUncompress(
                    reinterpret_cast<const char*>(remaining_view.data()),
                    remaining_view.size(),
                    reinterpret_cast<char*>(payload.data()))) {
                return SessionError::kInvalidMessage;
            }
        } else if (!remaining_view.empty()) {
            payload.assign(remaining_view.begin(), remaining_view.end());
        }

        Message msg{static_cast<uint8_t>(msg_id), std::move(payload)};
        return msg;
}

FramingResult<void> MessageStream::send_frame(ByteView frame_data, asio::yield_context yield) noexcept {
    // Encrypt and send frame
    FrameEncryptParams params{};
    params.frame_data = frame_data;
    params.is_first_frame = true;

        auto encrypted_result = cipher_->encrypt_frame(params);
        if ( !encrypted_result ) {
            return encrypted_result.error();
        }

        // Send over socket
        auto write_result = transport_.write_all(encrypted_result.value(), yield);
        if ( !write_result ) {
            return FramingError::kEncryptionFailed;
        }

        return outcome::success();
}

FramingResult<ByteBuffer> MessageStream::receive_frame(asio::yield_context yield) noexcept {
    auto header_with_mac_result = transport_.read_exact(kFrameHeaderWithMacSize, yield);
    if ( !header_with_mac_result ) {
        ByteBuffer empty;
        return empty;
    }

        const auto& header_data = header_with_mac_result.value();

        // Split into header and MAC
        gsl::span<const uint8_t, kFrameHeaderSize> header_span(
            header_data.data(), 
            kFrameHeaderSize
        );
        gsl::span<const uint8_t, kMacSize> header_mac_span(
            header_data.data() + kFrameHeaderSize, 
            kMacSize
        );

        // Decrypt header to get frame size
        auto frame_size_result = cipher_->decrypt_header(header_span, header_mac_span);
        if ( !frame_size_result ) {
            ByteBuffer empty;
            return empty;
        }

        size_t frame_size = frame_size_result.value();

        // Frame body is padded to 16-byte boundary on the wire; followed by 16-byte MAC.
        const size_t padding    = (frame_size % kFramePaddingAlignment != 0)
                                ? (kFramePaddingAlignment - (frame_size % kFramePaddingAlignment))
                                : 0;
        const size_t padded_size = frame_size + padding;
        size_t total_frame_bytes = padded_size + kMacSize;
        auto frame_with_mac_result = transport_.read_exact(total_frame_bytes, yield);
        if ( !frame_with_mac_result ) {
            ByteBuffer empty;
            return empty;
        }

        const auto& frame_data = frame_with_mac_result.value();

        // Split into frame and MAC
        ByteView frame_ciphertext(frame_data.data(), padded_size);
        gsl::span<const uint8_t, kMacSize> frame_mac_span(
            frame_data.data() + padded_size, 
            kMacSize
        );

        // Decrypt frame body — header MAC already advanced in decrypt_header above
        auto decrypted_result = cipher_->decrypt_frame_body(
            frame_size,
            frame_ciphertext,
            ByteView(frame_mac_span.data(), kMacSize)
        );
        if ( !decrypted_result ) {
            ByteBuffer empty;
            return empty;
        }

        return decrypted_result.value();
}

} // namespace rlpx::framing

Updated on 2026-04-13 at 23:22:46 -0700