Skip to content

rlp/rlp_streaming.cpp

Namespaces

Name
rlp

Source code

#include <rlp/rlp_streaming.hpp>
#include <rlp/endian.hpp>
#include <cstring>

namespace rlp {

namespace { // Anonymous namespace for helpers

// Encode RLP string header for given payload length
// Returns empty bytes if length exceeds 64-bit limit (practically impossible)
Bytes encode_string_header(size_t payload_len) {
    Bytes header;

    if (payload_len <= kMaxShortStringLen) { // <= 55 bytes
        header.push_back(static_cast<uint8_t>(kShortStringOffset + payload_len));
    } else {
        // Long form: 0xb8 + length_of_length + length_bytes
        Bytes len_be = endian::to_big_compact(static_cast<uint64_t>(payload_len));
        if (len_be.length() > 8) {
            // Practically impossible on 64-bit systems
            return Bytes{}; // Return empty to signal error
        }
        header.reserve(1 + len_be.length());
        header.push_back(static_cast<uint8_t>(kLongStringOffset + len_be.length()));
        header.append(len_be.data(), len_be.length());
    }

    return header;
}

} // namespace

// ============================================================================
// Approach A: Reserve & Patch Header Implementation
// ============================================================================

// Factory method implementation
StreamingResult<RlpLargeStringEncoder> RlpLargeStringEncoder::create(RlpEncoder& encoder) {
    // Validate that encoder is in a valid state and reserve header space
    auto buffer_result = encoder.GetBytes();
    if (!buffer_result) {
        return StreamingError::kNotFinalized; // Encoder has unclosed lists
    }

    // Reserve maximum possible header space (1 byte prefix + 8 bytes length)
    encoder.reserve(encoder.size() + 9);

    // Temporarily fill with zeros (will be patched in finish())
    Bytes* buffer = buffer_result.value();
    for (size_t i = 0; i < 9; ++i) {
        buffer->push_back(0);
    }

    // Use private constructor - header space is already initialized
    return RlpLargeStringEncoder(encoder);
}

RlpLargeStringEncoder::RlpLargeStringEncoder(RlpEncoder& encoder)
    : encoder_(encoder)
    , header_start_(encoder.size() - 9) // Header was just added by create()
    , payload_start_(encoder.size())    // Payload starts after reserved header
    , payload_size_(0)
    , finished_(false) {
    // Header space already reserved and initialized by create()
}

// Destructor - automatically finish if not already done
RlpLargeStringEncoder::~RlpLargeStringEncoder() {
    if (!finished_) {
        (void)finish(); // Ignore result in destructor
    }
}

StreamingOperationResult RlpLargeStringEncoder::addChunk(ByteView chunk) {
    if (finished_) {
        return StreamingError::kAlreadyFinalized;
    }

    // Append chunk directly to encoder buffer
    auto buffer_result = encoder_.GetBytes();
    if (!buffer_result) {
        return StreamingError::kNotFinalized; // Encoder has unclosed lists
    }
    Bytes* buffer = buffer_result.value();
    buffer->append(chunk.data(), chunk.size());
    payload_size_ += chunk.size();

    return outcome::success();
}

StreamingOperationResult RlpLargeStringEncoder::finish() {
    if (finished_) {
        return StreamingError::kAlreadyFinalized;
    }
    finished_ = true;

    // Encode the actual header based on final payload size
    Bytes actual_header = encode_string_header(payload_size_);
    if (actual_header.empty() && payload_size_ > kMaxShortStringLen) {
        // Header encoding failed (payload too large - practically impossible)
        return StreamingError::kHeaderSizeExceeded;
    }

    size_t actual_header_size = actual_header.size();
    size_t reserved_size = 9;

    // Get mutable access to encoder buffer
    auto buffer_result = encoder_.GetBytes();
    if (!buffer_result) {
        return StreamingError::kNotFinalized; // Encoder has unclosed lists
    }
    Bytes* buffer = buffer_result.value();

    if (actual_header_size <= reserved_size) {
        // Calculate how much to shift
        size_t shift = reserved_size - actual_header_size;

        // Copy actual header to the start position
        std::memcpy(buffer->data() + header_start_, actual_header.data(), actual_header_size);

        if (shift > 0) {
            // Shift payload left to eliminate gap
            std::memmove(
                buffer->data() + header_start_ + actual_header_size,
                buffer->data() + payload_start_,
                payload_size_
            );
            // Resize to remove the gap
            buffer->resize(buffer->size() - shift);
        }
    } else {
        // Should never happen with 9-byte reservation
        return StreamingError::kHeaderSizeExceeded;
    }

    return outcome::success();
}

// ============================================================================
// Approach B: Chunked List Encoding Implementation
// ============================================================================

RlpChunkedListEncoder::RlpChunkedListEncoder(RlpEncoder& encoder, size_t chunk_size)
    : encoder_(encoder)
    , chunk_size_(chunk_size)
    , chunk_count_(0)
    , total_bytes_(0)
    , finished_(false)
    , list_started_(false) {
    buffer_.reserve(chunk_size);
}

StreamingResult<RlpChunkedListEncoder> RlpChunkedListEncoder::create(RlpEncoder& encoder, size_t chunk_size) {
    if (chunk_size == 0) {
        return StreamingError::kInvalidChunkSize;
    }
    return RlpChunkedListEncoder(encoder, chunk_size);
}

// Destructor - automatically finish if not already done
RlpChunkedListEncoder::~RlpChunkedListEncoder() {
    if (!finished_) {
        (void)finish(); // Ignore result in destructor
    }
}

StreamingOperationResult RlpChunkedListEncoder::addChunk(ByteView data) {
    if (finished_) {
        return StreamingError::kAlreadyFinalized;
    }

    // Start list on first data
    if (!list_started_) {
        auto result = encoder_.BeginList();
        if (!result) {
            return StreamingError::kNotFinalized; // Failed to begin list
        }
        list_started_ = true;
    }

    size_t offset = 0;
    while (offset < data.size()) {
        size_t space_left = chunk_size_ - buffer_.size();
        size_t to_copy = std::min(space_left, data.size() - offset);

        buffer_.append(data.data() + offset, to_copy);
        offset += to_copy;
        total_bytes_ += to_copy;

        // Flush if buffer is full
        if (buffer_.size() >= chunk_size_) {
            flushBuffer();
        }
    }

    return outcome::success();
}

void RlpChunkedListEncoder::flushBuffer() {
    if (buffer_.empty()) {
        return;
    }

    // Add buffer as RLP string to the list
    auto result = encoder_.add(ByteView(buffer_.data(), buffer_.size()));
    // Ignore errors in internal helper - errors are caught at flush()
    (void)result;
    chunk_count_++;
    buffer_.clear();
}

StreamingOperationResult RlpChunkedListEncoder::finish() {
    if (finished_) {
        return StreamingError::kAlreadyFinalized;
    }
    finished_ = true;

    // Flush any remaining data
    if (!buffer_.empty()) {
        flushBuffer();
    }

    // End the list if it was started
    if (list_started_) {
        auto result = encoder_.EndList();
        if (!result) {
            return StreamingError::kNotFinalized; // Failed to end list
        }
    } else {
        // Empty data - create empty list
        auto begin_result = encoder_.BeginList();
        if (!begin_result) {
            return StreamingError::kNotFinalized;
        }
        auto end_result = encoder_.EndList();
        if (!end_result) {
            return StreamingError::kNotFinalized;
        }
    }

    return outcome::success();
}

// ============================================================================
// Streaming Decoder Implementations
// ============================================================================

// ============================================================================
// Approach A: Large String Decoder Implementation
// ============================================================================

RlpLargeStringDecoder::RlpLargeStringDecoder(const RlpDecoder& decoder)
    : view_(decoder.Remaining())
    , payload_size_(0)
    , bytes_read_(0)
    , initialized_(false) {
}

RlpLargeStringDecoder::RlpLargeStringDecoder(ByteView data)
    : view_(data)
    , payload_size_(0)
    , bytes_read_(0)
    , initialized_(false) {
}

Result<size_t> RlpLargeStringDecoder::peekPayloadSize() const noexcept {
    // Peek header from our view
    RlpDecoder temp_decoder(view_);
    BOOST_OUTCOME_TRY(auto h, temp_decoder.PeekHeader());

    if (h.list) {
        return DecodingError::kUnexpectedList;
    }

    if (view_.length() < h.header_size_bytes + h.payload_size_bytes) {
        return DecodingError::kInputTooShort;
    }

    return h.payload_size_bytes;
}

Result<ByteView> RlpLargeStringDecoder::readChunk(size_t max_chunk_size) {
    // Initialize on first read
    if (!initialized_) {
        RlpDecoder temp_decoder(view_);
        BOOST_OUTCOME_TRY(auto h, temp_decoder.PeekHeader());

        if (h.list) {
            return DecodingError::kUnexpectedList;
        }

        if (view_.length() < h.header_size_bytes + h.payload_size_bytes) {
            return DecodingError::kInputTooShort;
        }

        payload_size_ = h.payload_size_bytes;

        // Skip header in our view
        view_.remove_prefix(h.header_size_bytes);
        initialized_ = true;
    }

    // Check if already finished
    if (bytes_read_ >= payload_size_) {
        return ByteView{}; // Empty view signals completion
    }

    // Calculate chunk size
    size_t remaining = payload_size_ - bytes_read_;
    size_t chunk_size = std::min(remaining, max_chunk_size);

    // Get chunk view from our view by creating a new string_view with limited size
    ByteView chunk(view_.data(), chunk_size);

    // Advance our position
    view_.remove_prefix(chunk_size);
    bytes_read_ += chunk_size;

    return chunk;
}

// ============================================================================
// Approach B: Chunked List Decoder Implementation
// ============================================================================

RlpChunkedListDecoder::RlpChunkedListDecoder(const RlpDecoder& decoder)
    : view_(decoder.Remaining())
    , list_payload_()
    , total_size_(0)
    , total_chunks_(0)
    , chunk_index_(0)
    , initialized_(false) {
}

RlpChunkedListDecoder::RlpChunkedListDecoder(ByteView data)
    : view_(data)
    , list_payload_()
    , total_size_(0)
    , total_chunks_(0)
    , chunk_index_(0)
    , initialized_(false) {
}

Result<size_t> RlpChunkedListDecoder::peekTotalSize() {
    if (initialized_ && total_size_ > 0) {
        return total_size_; // Return cached value
    }

    // Peek list header from our view
    RlpDecoder temp_decoder(view_);
    BOOST_OUTCOME_TRY(auto h, temp_decoder.PeekHeader());

    if (!h.list) {
        return DecodingError::kUnexpectedString;
    }

    if (view_.length() < h.header_size_bytes + h.payload_size_bytes) {
        return DecodingError::kInputTooShort;
    }

    // Scan through list to calculate total size
    ByteView list_view(view_.data() + h.header_size_bytes, h.payload_size_bytes);
    size_t total = 0;
    size_t chunks = 0;

    while (!list_view.empty()) {
        RlpDecoder chunk_decoder(list_view);

        BOOST_OUTCOME_TRY(auto chunk_h, chunk_decoder.PeekHeader());

        if (chunk_h.list) {
            return DecodingError::kUnexpectedList;
        }

        total += chunk_h.payload_size_bytes;
        chunks++;

        // Skip this chunk
        size_t chunk_total_size = chunk_h.header_size_bytes + chunk_h.payload_size_bytes;
        if (list_view.length() < chunk_total_size) {
            return DecodingError::kInputTooShort;
        }
        list_view.remove_prefix(chunk_total_size);
    }

    total_size_ = total;
    total_chunks_ = chunks;

    return total_size_;
}

Result<size_t> RlpChunkedListDecoder::peekChunkCount() {
    if (initialized_ && total_chunks_ > 0) {
        return total_chunks_; // Return cached value
    }

    // peekTotalSize() calculates both total_size_ and total_chunks_
    BOOST_OUTCOME_TRY(peekTotalSize());

    return total_chunks_;
}

Result<ByteView> RlpChunkedListDecoder::readChunk() {
    // Initialize on first read
    if (!initialized_) {
        // Read list header from our view
        RlpDecoder temp_decoder(view_);
        BOOST_OUTCOME_TRY(auto h, temp_decoder.PeekHeader());

        if (!h.list) {
            return DecodingError::kUnexpectedString;
        }

        if (view_.length() < h.header_size_bytes + h.payload_size_bytes) {
            return DecodingError::kInputTooShort;
        }

        size_t list_payload_len = h.payload_size_bytes;

        // Skip list header in our view
        view_.remove_prefix(h.header_size_bytes);

        // Set list_payload_ to point to the payload
        list_payload_ = ByteView(view_.data(), list_payload_len);

        // If we haven't peeked yet, do it now to get total_chunks_
        if (total_chunks_ == 0) {
            // Count chunks by scanning
            ByteView temp_view = list_payload_;
            size_t chunks = 0;

            while (!temp_view.empty()) {
                RlpDecoder temp_decoder2(temp_view);
                BOOST_OUTCOME_TRY(auto h2, temp_decoder2.PeekHeader());

                if (h2.list) {
                    return DecodingError::kUnexpectedList;
                }

                chunks++;
                size_t total_size = h2.header_size_bytes + h2.payload_size_bytes;
                if (temp_view.length() < total_size) {
                    return DecodingError::kInputTooShort;
                }
                temp_view.remove_prefix(total_size);
            }

            total_chunks_ = chunks;
        }

        initialized_ = true;
    }

    // Check if already finished
    if (chunk_index_ >= total_chunks_ || list_payload_.empty()) {
        // Consume list payload from our view if not already done
        if (!list_payload_.empty()) {
            view_.remove_prefix(list_payload_.length());
            list_payload_ = ByteView{};
        }
        return ByteView{}; // Empty view signals completion
    }

    // Decode next chunk
    RlpDecoder chunk_decoder(list_payload_);

    BOOST_OUTCOME_TRY(auto h, chunk_decoder.PeekHeader());

    if (h.list) {
        return DecodingError::kUnexpectedList;
    }

    // Get chunk payload view
    if (list_payload_.length() < h.header_size_bytes + h.payload_size_bytes) {
        return DecodingError::kInputTooShort;
    }

    ByteView chunk_payload(list_payload_.data() + h.header_size_bytes, h.payload_size_bytes);

    // Advance list_payload_ past this chunk
    size_t chunk_total_size = h.header_size_bytes + h.payload_size_bytes;
    list_payload_.remove_prefix(chunk_total_size);
    chunk_index_++;

    // If this was the last chunk, update our view
    if (list_payload_.empty() && chunk_index_ >= total_chunks_) {
        view_.remove_prefix(0); // Already consumed
    }

    return chunk_payload;
}

} // namespace rlp

Updated on 2026-04-13 at 23:22:46 -0700