Skip to content

src/processing/processing_validation_core.cpp

Source file of core implementation of processing task results validation. More...

Namespaces

Name
sgns
sgns::processing

Functions

Name
OUTCOME_CPP_DEFINE_CATEGORY_3(sgns::processing , ProcessingValidationCore::Error , e )

Detailed Description

Source file of core implementation of processing task results validation.

Date: 2022-05-08 creativeid00

Note: This was mostly rewritten by Henrique A. Klein ([email protected]) and Justin Church ([email protected])

Functions Documentation

function OUTCOME_CPP_DEFINE_CATEGORY_3

OUTCOME_CPP_DEFINE_CATEGORY_3(
    sgns::processing ,
    ProcessingValidationCore::Error ,
    e 
)

Source code

#include <optional>
#include <unordered_set>
#include "processing_validation_core.hpp"
#include "processing/processing_subtask_queue.hpp"
#include "processing/processing_subtask_queue_channel.hpp"

OUTCOME_CPP_DEFINE_CATEGORY_3( sgns::processing, ProcessingValidationCore::Error, e )
{
    using ValidationError = sgns::processing::ProcessingValidationCore::Error;
    switch ( e )
    {
        case ValidationError::NO_RESULTS_FOR_SUBTASK:
            return "Subtask was finalized with no results";
        case ValidationError::WRONG_RESULT_HASHES_LENGTH:
            return "The hashes length doesn't match the chunks to process length";
        case ValidationError::DUPLICATE_CHUNK_RESULT_HASH:
            return "A duplicate chunk result hash was found";
        case ValidationError::EMPTY_CHUNK_RESULT_HASH:
            return "Empty chunk result hash was found";
        case ValidationError::MISSING_CHUNK_RESULT:
            return "Missing chunk result found";
        case ValidationError::INVALID_CHUNK_RESULT_HASH:
            return "The chunk result hash is invalid";
        case ValidationError::SUBTASK_ID_MISMATCH:
            return "The subtask id doesn't match the result id";
        case ValidationError::INVALID_RESULTS_BATCH:
            return "The results batch is invalid";
    }
    return "Unknown error";
}

namespace sgns::processing
{

    ProcessingValidationCore::ProcessingValidationCore() {}

    outcome::result<void> ProcessingValidationCore::ValidateResults(
        const SGProcessing::SubTaskCollection                    &subTasks,
        const std::map<std::string, SGProcessing::SubTaskResult> &results,
        std::set<std::string>                                    &invalidSubTaskIds )
    {

        std::optional<std::error_code> error;

        // Compare result hashes for each chunk
        // If a chunk hashes didn't match each other add the all subtasks with invalid hashes to VALID ITEMS LIST
        std::map<std::string, std::vector<uint8_t>> chunks;

        for ( int itemIdx = 0; itemIdx < subTasks.items_size(); ++itemIdx )
        {
            const auto &subTask  = subTasks.items( itemIdx );
            auto        itResult = results.find( subTask.subtaskid() );
            if ( itResult != results.end() )
            {
                if ( itResult->second.chunk_hashes_size() != subTask.chunkstoprocess_size() )
                {
                    m_logger->error( "WRONG_RESULT_HASHES_LENGTH {}: {} {}",
                                     subTask.subtaskid(),
                                     itResult->second.chunk_hashes_size(),
                                     subTask.chunkstoprocess_size() );
                    invalidSubTaskIds.insert( subTask.subtaskid() );
                    if ( !error )
                    {
                        error = make_error_code(Error::WRONG_RESULT_HASHES_LENGTH);
                    }
                }
                else
                {
                    for ( int chunkIdx = 0; chunkIdx < subTask.chunkstoprocess_size(); ++chunkIdx )
                    {
                        auto it = chunks.insert(
                            std::make_pair( subTask.chunkstoprocess( chunkIdx ).SerializeAsString(),
                                            std::vector<uint8_t>() ) );
                        const std::string &chunkHashBytes = itResult->second.chunk_hashes( chunkIdx );
                        //it.first->second.push_back(itResult->second.chunk_hashes(chunkIdx));
                        it.first->second.insert( it.first->second.end(), chunkHashBytes.begin(), chunkHashBytes.end() );
                    }
                }
            }
            else
            {
                // Since all subtasks are processed a result should be found for all of them
                m_logger->error( "NO_RESULTS_FOUND {} on ", subTask.subtaskid() );
                invalidSubTaskIds.insert( subTask.subtaskid() );
                if ( !error )
                {
                    error = make_error_code(Error::NO_RESULTS_FOR_SUBTASK);
                }
            }
        }

        for ( int itemIdx = 0; itemIdx < subTasks.items_size(); ++itemIdx )
        {
            const auto &subTask = subTasks.items( itemIdx );
            if ( invalidSubTaskIds.find( subTask.subtaskid() ) != invalidSubTaskIds.end() )
            {
                m_logger->trace( "Subtask already invalid {}, no need to check chunk hashes ", subTask.subtaskid() );
                continue;
            }

            auto subtaskCheck = CheckSubTaskResultHashes( subTask, chunks );
            if ( subtaskCheck.has_failure() )
            {
                invalidSubTaskIds.insert( subTask.subtaskid() );
                if ( !error )
                {
                    error = subtaskCheck.error();
                }
            }
        }

        if ( error )
        {
            return outcome::failure( *error );
        }
        return outcome::success();
    }

    outcome::result<void> ProcessingValidationCore::ValidateIndividualResult(
        const SGProcessing::SubTask       &subTask,
        const SGProcessing::SubTaskResult &result ) const
    {
        // Check 1: Verify subtask IDs match
        if ( subTask.subtaskid() != result.subtaskid() )
        {
            m_logger->error( "SUBTASK_ID_MISMATCH: expected {}, got {}", subTask.subtaskid(), result.subtaskid() );
            return outcome::failure( Error::SUBTASK_ID_MISMATCH );
        }

        // Check 2: Verify hash count matches chunk count
        if ( result.chunk_hashes_size() != subTask.chunkstoprocess_size() )
        {
            m_logger->error( "WRONG_RESULT_HASHES_LENGTH {}: {} {}",
                             subTask.subtaskid(),
                             result.chunk_hashes_size(),
                             subTask.chunkstoprocess_size() );
            return outcome::failure( Error::WRONG_RESULT_HASHES_LENGTH );
        }

        // Check 3: Verify no duplicate hashes
        std::unordered_set<std::string> encounteredHashes;
        for ( int chunkIdx = 0; chunkIdx < result.chunk_hashes_size(); ++chunkIdx )
        {
            const std::string &chunkHash = result.chunk_hashes( chunkIdx );

            if ( !encounteredHashes.insert( chunkHash ).second )
            {
                const auto &chunk = subTask.chunkstoprocess( chunkIdx );
                m_logger->error( "DUPLICATE_CHUNK_RESULT_HASH [{}, {}]", subTask.subtaskid(), chunk.chunkid() );
                return outcome::failure( Error::DUPLICATE_CHUNK_RESULT_HASH );
            }

            // Check 4: Verify hash is not empty
            if ( chunkHash.empty() )
            {
                const auto &chunk = subTask.chunkstoprocess( chunkIdx );
                m_logger->error( "EMPTY_CHUNK_RESULT_HASH [{}, {}]", subTask.subtaskid(), chunk.chunkid() );
                return outcome::failure( Error::EMPTY_CHUNK_RESULT_HASH );
            }
        }

        return outcome::success();
    }

    outcome::result<void> ProcessingValidationCore::CheckSubTaskResultHashes(
        const SGProcessing::SubTask                       &subTask,
        const std::map<std::string, std::vector<uint8_t>> &chunks ) const
    {
        std::unordered_set<std::string> encounteredHashes;
        for ( int chunkIdx = 0; chunkIdx < subTask.chunkstoprocess_size(); ++chunkIdx )
        {
            const auto &chunk = subTask.chunkstoprocess( chunkIdx );
            auto        it    = chunks.find( chunk.SerializeAsString() );
            if ( it != chunks.end() )
            {
                std::string chunkHash( it->second.begin(), it->second.end() );
                if ( !encounteredHashes.insert( chunkHash ).second )
                {
                    m_logger->error( "INVALID_CHUNK_RESULT_HASH [{}, {}]", subTask.subtaskid(), chunk.chunkid() );
                    return outcome::failure( Error::INVALID_CHUNK_RESULT_HASH );
                }
            }
            else
            {
                m_logger->error( "NO_CHUNK_RESULT_FOUND [{}, {}]", subTask.subtaskid(), chunk.chunkid() );
                return outcome::failure( Error::MISSING_CHUNK_RESULT );
            }
        }
        return outcome::success();
    }

}

Updated on 2026-03-04 at 13:10:44 -0800