Skip to content

rlp/rlp_streaming.hpp

Namespaces

Name
rlp

Classes

Name
class rlp::RlpLargeStringEncoder
class rlp::RlpChunkedListEncoder
class rlp::RlpLargeStringDecoder
class rlp::RlpChunkedListDecoder

Functions

Name
template <typename Func >
StreamingOperationResult
encodeLargeString(RlpEncoder & encoder, Func && generator)
template <typename Func >
StreamingOperationResult
encodeChunkedList(RlpEncoder & encoder, Func && generator, size_t chunk_size =32768)
template <typename Func >
DecodingResult
decodeLargeString(RlpDecoder & decoder, Func && callback, size_t read_chunk_size =32768)
template <typename Func >
DecodingResult
decodeChunkedList(RlpDecoder & decoder, Func && callback)
Result< Bytes > decodeChunkedListFull(RlpDecoder & decoder)

Functions Documentation

function encodeLargeString

template <typename Func >
inline StreamingOperationResult encodeLargeString(
    RlpEncoder & encoder,
    Func && generator
)

function encodeChunkedList

template <typename Func >
inline StreamingOperationResult encodeChunkedList(
    RlpEncoder & encoder,
    Func && generator,
    size_t chunk_size =32768
)

function decodeLargeString

template <typename Func >
inline DecodingResult decodeLargeString(
    RlpDecoder & decoder,
    Func && callback,
    size_t read_chunk_size =32768
)

function decodeChunkedList

template <typename Func >
inline DecodingResult decodeChunkedList(
    RlpDecoder & decoder,
    Func && callback
)

function decodeChunkedListFull

inline Result< Bytes > decodeChunkedListFull(
    RlpDecoder & decoder
)

Source code

#ifndef RLP_STREAMING_HPP
#define RLP_STREAMING_HPP

#include "common.hpp"
#include "rlp_encoder.hpp"
#include "rlp_decoder.hpp"
#include <functional>

namespace rlp {

// ============================================================================
// Approach A: Reserve & Patch Header (Single Large RLP String)
// ============================================================================
// Use this for producing canonical single RLP strings for large payloads.
// Common for: contract bytecode, large calldata, block bodies.
// 
// Benefits:
// - Produces canonical single RLP string (not chunked)
// - Minimal memory overhead (no intermediate buffering)
// - Single output stream
//
// Requirements:
// - Need random-access to output buffer (seeks back to patch header)
// - Must know or calculate total payload size before flushing

class RlpLargeStringEncoder {
public:
    // Factory method: Step 1 - Create and reserve space automatically
    // Returns error if reservation fails
    [[nodiscard]] static StreamingResult<RlpLargeStringEncoder> create(RlpEncoder& encoder);

    // Step 2 - Add chunk of payload data
    // Returns error if already finished
    [[nodiscard]] StreamingOperationResult addChunk(ByteView chunk);

    // Explicitly finish encoding and patch header
    // Can be called manually, or automatically called by destructor
    // Returns error if already finished
    [[nodiscard]] StreamingOperationResult finish();

    // Get current payload size
    [[nodiscard]] size_t payloadSize() const noexcept { return payload_size_; }

    // Check if already finished
    [[nodiscard]] bool isFinished() const noexcept { return finished_; }

    // Destructor automatically calls finish() if not already called
    ~RlpLargeStringEncoder();

    // Disable copying, enable moving with custom implementation
    RlpLargeStringEncoder(const RlpLargeStringEncoder&) = delete;
    RlpLargeStringEncoder& operator=(const RlpLargeStringEncoder&) = delete;
    RlpLargeStringEncoder(RlpLargeStringEncoder&& other) noexcept
        : encoder_(other.encoder_)
        , header_start_(other.header_start_)
        , payload_start_(other.payload_start_)
        , payload_size_(other.payload_size_)
        , finished_(other.finished_) {
        // Mark the moved-from object as finished to prevent double-finish
        other.finished_ = true;
    }
    RlpLargeStringEncoder& operator=(RlpLargeStringEncoder&&) = delete; // Not assignable after construction

private:
    // Private constructor - use create() factory method
    explicit RlpLargeStringEncoder(RlpEncoder& encoder);

    RlpEncoder& encoder_;
    size_t header_start_;      // Position where header starts
    size_t payload_start_;     // Position where payload starts (after reserved header)
    size_t payload_size_{0};   // Accumulated payload size
    bool finished_{false};     // Renamed from flushed_ for clarity
};

// Convenience function: encode large string with callback providing chunks
// Returns error if creation or write operation fails. Automatic finish() via RAII.
template <typename Func>
inline StreamingOperationResult encodeLargeString(RlpEncoder& encoder, Func&& generator) {
    BOOST_OUTCOME_TRY(auto stream, RlpLargeStringEncoder::create(encoder));

    StreamingOperationResult result = outcome::success();
    generator([&stream, &result](ByteView chunk) {
        if (!result) return; // Skip if already failed
        result = stream.addChunk(chunk);
    });

    return result; // finish() automatically called by destructor
}

// ============================================================================
// Approach B: Chunked List Encoding (Multiple RLP Strings)
// ============================================================================
// Use when both encoder/decoder agree on chunked format.
// Common for: streaming protocols, append-only logs, progressive data transfer.
//
// Benefits:
// - No header patching required (append-only)
// - Can start transmitting before knowing total size
// - Natural for streaming/progressive scenarios
//
// Trade-offs:
// - Not canonical (produces list-of-strings, not single string)
// - Requires decoder to reassemble chunks
// - Both sides must agree on chunked encoding protocol

class RlpChunkedListEncoder {
public:
    // Constructor validates chunk_size > 0
    [[nodiscard]] static StreamingResult<RlpChunkedListEncoder> create(RlpEncoder& encoder, size_t chunk_size = 32768);

    // Destructor automatically calls finish() if not already done
    ~RlpChunkedListEncoder();

    // Add data - automatically chunks into RLP strings within a list
    // Returns error if already finished
    [[nodiscard]] StreamingOperationResult addChunk(ByteView data);

    // Finalize encoding - flush any remaining buffered data as a final chunk
    // Returns error if already finished. Automatically called by destructor.
    [[nodiscard]] StreamingOperationResult finish();

    // Get number of chunks encoded
    [[nodiscard]] size_t chunkCount() const noexcept { return chunk_count_; }

    // Get total bytes encoded
    [[nodiscard]] size_t totalBytes() const noexcept { return total_bytes_; }

    // Check if already finished
    [[nodiscard]] bool isFinished() const noexcept { return finished_; }

    // Disable copying, enable moving with custom implementation
    RlpChunkedListEncoder(const RlpChunkedListEncoder&) = delete;
    RlpChunkedListEncoder& operator=(const RlpChunkedListEncoder&) = delete;
    RlpChunkedListEncoder(RlpChunkedListEncoder&& other) noexcept
        : encoder_(other.encoder_)
        , chunk_size_(other.chunk_size_)
        , buffer_(std::move(other.buffer_))
        , chunk_count_(other.chunk_count_)
        , total_bytes_(other.total_bytes_)
        , finished_(other.finished_)
        , list_started_(other.list_started_) {
        // Mark the moved-from object as finished to prevent double-finish
        other.finished_ = true;
    }
    RlpChunkedListEncoder& operator=(RlpChunkedListEncoder&&) = delete; // Not assignable after construction

private:
    // Private constructor - use create() factory method
    explicit RlpChunkedListEncoder(RlpEncoder& encoder, size_t chunk_size);

    void flushBuffer();

    RlpEncoder& encoder_;
    size_t chunk_size_;
    Bytes buffer_;
    size_t chunk_count_{0};
    size_t total_bytes_{0};
    bool finished_{false};
    bool list_started_{false};
};

// Convenience function: encode data as chunked list
// Returns error if creation or write fails. Automatic finish() via RAII.
template <typename Func>
inline StreamingOperationResult encodeChunkedList(RlpEncoder& encoder, Func&& generator, size_t chunk_size = 32768) {
    BOOST_OUTCOME_TRY(auto chunked, RlpChunkedListEncoder::create(encoder, chunk_size));

    StreamingOperationResult result = outcome::success();
    generator([&chunked, &result](ByteView chunk) {
        if (!result) return; // Skip if already failed
        result = chunked.addChunk(chunk);
    });

    return result; // finish() automatically called by destructor
}

// ============================================================================
// Streaming Decoder Classes (Two-Phase Approach)
// ============================================================================

// ============================================================================
// Approach A: Large String Decoder (Two-Phase)
// ============================================================================
// Two-phase streaming decoder for large RLP strings:
// Phase 1: Peek size → allows caller to allocate buffer
// Phase 2: Read chunks → decode into pre-allocated buffer
//
// Use this for memory-efficient decoding of large single RLP strings
//
// This decoder is self-contained and manages its own state without
// modifying the original RlpDecoder.

class RlpLargeStringDecoder {
public:
    // Create from RlpDecoder's remaining view
    // Creates an independent decoder by copying the remaining ByteView from the provided decoder.
    // The original decoder is not modified or affected.
    explicit RlpLargeStringDecoder(const RlpDecoder& decoder);

    // Create directly from ByteView
    explicit RlpLargeStringDecoder(ByteView data);

    // Phase 1: Get total payload size (allows pre-allocation)
    [[nodiscard]] Result<size_t> peekPayloadSize() const noexcept;

    // Phase 2: Read next chunk of payload data
    // Returns ByteView of the chunk read (size <= max_chunk_size)
    // Returns empty view when no more data
    // Returns error if not initialized or already finished
    [[nodiscard]] Result<ByteView> readChunk(size_t max_chunk_size = 32768);

    // Get current position in payload
    [[nodiscard]] size_t currentPosition() const noexcept { return bytes_read_; }

    // Get total payload size (after initialization)
    [[nodiscard]] size_t totalSize() const noexcept { return payload_size_; }

    // Check if all data has been read
    [[nodiscard]] bool isFinished() const noexcept {
        return initialized_ && bytes_read_ >= payload_size_;
    }

    // Check if initialized (header consumed)
    [[nodiscard]] bool isInitialized() const noexcept { return initialized_; }

    // Get remaining view after decoding (for chaining)
    [[nodiscard]] ByteView remaining() const noexcept { return view_; }

private:
    ByteView view_;            // Our own view of the data
    size_t payload_size_{0};
    size_t bytes_read_{0};
    bool initialized_{false};
};

// ============================================================================
// Approach B: Chunked List Decoder (Two-Phase)
// ============================================================================
// Two-phase streaming decoder for chunked RLP lists:
// Phase 1: Peek total reassembled size → allows caller to allocate buffer
// Phase 2: Read list items → decode each chunk
//
// Use this when data is encoded as list of RLP strings
//
// This decoder is self-contained and manages its own state without
// modifying the original RlpDecoder.

class RlpChunkedListDecoder {
public:
    // Create from RlpDecoder's remaining view
    // The decoder is not modified - we take a copy of its view
    explicit RlpChunkedListDecoder(const RlpDecoder& decoder);

    // Create directly from ByteView
    explicit RlpChunkedListDecoder(ByteView data);

    // Phase 1: Get total reassembled size (sum of all chunk sizes)
    // Must scan through list to calculate total
    [[nodiscard]] Result<size_t> peekTotalSize();

    // Get number of chunks in the list
    [[nodiscard]] Result<size_t> peekChunkCount();

    // Phase 2: Read next chunk from the list
    // Returns ByteView of the chunk
    // Returns empty view when no more chunks
    // Returns error if not initialized or decoding fails
    [[nodiscard]] Result<ByteView> readChunk();

    // Get current chunk index
    [[nodiscard]] size_t currentChunkIndex() const noexcept { return chunk_index_; }

    // Get total number of chunks (after initialization)
    [[nodiscard]] size_t totalChunks() const noexcept { return total_chunks_; }

    // Get total reassembled size (after peek)
    [[nodiscard]] size_t totalSize() const noexcept { return total_size_; }

    // Check if all chunks have been read
    [[nodiscard]] bool isFinished() const noexcept {
        return initialized_ && chunk_index_ >= total_chunks_;
    }

    // Check if initialized (list header consumed)
    [[nodiscard]] bool isInitialized() const noexcept { return initialized_; }

    // Get remaining view after decoding (for chaining)
    [[nodiscard]] ByteView remaining() const noexcept { return view_; }

private:
    ByteView view_;            // Our own view of the data
    ByteView list_payload_;    // View of the list payload being decoded
    size_t total_size_{0};
    size_t total_chunks_{0};
    size_t chunk_index_{0};
    bool initialized_{false};
};

// ============================================================================
// Callback-Based Decoder Functions (Legacy/Convenience)
// ============================================================================

// Decode large string with streaming callback (Approach A)
// Use for payloads encoded as single RLP string
// Note: This is a legacy convenience function. Consider using RlpLargeStringDecoder directly.
template <typename Func>
inline DecodingResult decodeLargeString(RlpDecoder& decoder, Func&& callback, size_t read_chunk_size = 32768) {
    RlpLargeStringDecoder stream_decoder(decoder);

    // Initialize and read chunks
    while (true) {
        auto chunk_result = stream_decoder.readChunk(read_chunk_size);
        if (!chunk_result) {
            return chunk_result.error();
        }

        ByteView chunk = chunk_result.value();
        if (chunk.empty()) {
            break; // Finished
        }

        callback(chunk);
    }

    return outcome::success();
}

// Decode chunked list and reassemble (Approach B)
// Use for payloads encoded as list of RLP strings
// Note: This is a legacy convenience function. Consider using RlpChunkedListDecoder directly.
template <typename Func>
inline DecodingResult decodeChunkedList(RlpDecoder& decoder, Func&& callback) {
    RlpChunkedListDecoder stream_decoder(decoder);

    size_t chunk_index = 0;
    while (true) {
        auto chunk_result = stream_decoder.readChunk();
        if (!chunk_result) {
            return chunk_result.error();
        }

        ByteView chunk = chunk_result.value();
        if (chunk.empty()) {
            break; // Finished
        }

        callback(chunk, chunk_index++);
    }

    return outcome::success();
}

// Full reassembly helper for chunked lists
inline Result<Bytes> decodeChunkedListFull(RlpDecoder& decoder) {
    Bytes result;

    auto decode_result = decodeChunkedList(decoder, [&result](ByteView chunk, size_t) {
        result.append(chunk.data(), chunk.size());
    });

    if (!decode_result) {
        return decode_result.error();
    }

    return result;
}

} // namespace rlp

#endif // RLP_STREAMING_HPP

Updated on 2026-04-13 at 23:22:46 -0700