src/processing/processing_validation_core.cpp¶
Source file of core implementation of processing task results validation. More...
Namespaces¶
| Name |
|---|
| sgns |
| sgns::processing |
Functions¶
| Name | |
|---|---|
| OUTCOME_CPP_DEFINE_CATEGORY_3(sgns::processing , ProcessingValidationCore::Error , e ) |
Detailed Description¶
Source file of core implementation of processing task results validation.
Date: 2022-05-08 creativeid00
Note: This was mostly rewritten by Henrique A. Klein ([email protected]) and Justin Church ([email protected])
Functions Documentation¶
function OUTCOME_CPP_DEFINE_CATEGORY_3¶
Source code¶
#include <optional>
#include <unordered_set>
#include "processing_validation_core.hpp"
#include "processing/processing_subtask_queue.hpp"
#include "processing/processing_subtask_queue_channel.hpp"
OUTCOME_CPP_DEFINE_CATEGORY_3( sgns::processing, ProcessingValidationCore::Error, e )
{
using ValidationError = sgns::processing::ProcessingValidationCore::Error;
switch ( e )
{
case ValidationError::NO_RESULTS_FOR_SUBTASK:
return "Subtask was finalized with no results";
case ValidationError::WRONG_RESULT_HASHES_LENGTH:
return "The hashes length doesn't match the chunks to process length";
case ValidationError::DUPLICATE_CHUNK_RESULT_HASH:
return "A duplicate chunk result hash was found";
case ValidationError::EMPTY_CHUNK_RESULT_HASH:
return "Empty chunk result hash was found";
case ValidationError::MISSING_CHUNK_RESULT:
return "Missing chunk result found";
case ValidationError::INVALID_CHUNK_RESULT_HASH:
return "The chunk result hash is invalid";
case ValidationError::SUBTASK_ID_MISMATCH:
return "The subtask id doesn't match the result id";
case ValidationError::INVALID_RESULTS_BATCH:
return "The results batch is invalid";
}
return "Unknown error";
}
namespace sgns::processing
{
ProcessingValidationCore::ProcessingValidationCore() {}
outcome::result<void> ProcessingValidationCore::ValidateResults(
const SGProcessing::SubTaskCollection &subTasks,
const std::map<std::string, SGProcessing::SubTaskResult> &results,
std::set<std::string> &invalidSubTaskIds )
{
std::optional<std::error_code> error;
// Compare result hashes for each chunk
// If a chunk hashes didn't match each other add the all subtasks with invalid hashes to VALID ITEMS LIST
std::map<std::string, std::vector<uint8_t>> chunks;
for ( int itemIdx = 0; itemIdx < subTasks.items_size(); ++itemIdx )
{
const auto &subTask = subTasks.items( itemIdx );
auto itResult = results.find( subTask.subtaskid() );
if ( itResult != results.end() )
{
if ( itResult->second.chunk_hashes_size() != subTask.chunkstoprocess_size() )
{
m_logger->error( "WRONG_RESULT_HASHES_LENGTH {}: {} {}",
subTask.subtaskid(),
itResult->second.chunk_hashes_size(),
subTask.chunkstoprocess_size() );
invalidSubTaskIds.insert( subTask.subtaskid() );
if ( !error )
{
error = make_error_code(Error::WRONG_RESULT_HASHES_LENGTH);
}
}
else
{
for ( int chunkIdx = 0; chunkIdx < subTask.chunkstoprocess_size(); ++chunkIdx )
{
auto it = chunks.insert(
std::make_pair( subTask.chunkstoprocess( chunkIdx ).SerializeAsString(),
std::vector<uint8_t>() ) );
const std::string &chunkHashBytes = itResult->second.chunk_hashes( chunkIdx );
//it.first->second.push_back(itResult->second.chunk_hashes(chunkIdx));
it.first->second.insert( it.first->second.end(), chunkHashBytes.begin(), chunkHashBytes.end() );
}
}
}
else
{
// Since all subtasks are processed a result should be found for all of them
m_logger->error( "NO_RESULTS_FOUND {} on ", subTask.subtaskid() );
invalidSubTaskIds.insert( subTask.subtaskid() );
if ( !error )
{
error = make_error_code(Error::NO_RESULTS_FOR_SUBTASK);
}
}
}
for ( int itemIdx = 0; itemIdx < subTasks.items_size(); ++itemIdx )
{
const auto &subTask = subTasks.items( itemIdx );
if ( invalidSubTaskIds.find( subTask.subtaskid() ) != invalidSubTaskIds.end() )
{
m_logger->trace( "Subtask already invalid {}, no need to check chunk hashes ", subTask.subtaskid() );
continue;
}
auto subtaskCheck = CheckSubTaskResultHashes( subTask, chunks );
if ( subtaskCheck.has_failure() )
{
invalidSubTaskIds.insert( subTask.subtaskid() );
if ( !error )
{
error = subtaskCheck.error();
}
}
}
if ( error )
{
return outcome::failure( *error );
}
return outcome::success();
}
outcome::result<void> ProcessingValidationCore::ValidateIndividualResult(
const SGProcessing::SubTask &subTask,
const SGProcessing::SubTaskResult &result ) const
{
// Check 1: Verify subtask IDs match
if ( subTask.subtaskid() != result.subtaskid() )
{
m_logger->error( "SUBTASK_ID_MISMATCH: expected {}, got {}", subTask.subtaskid(), result.subtaskid() );
return outcome::failure( Error::SUBTASK_ID_MISMATCH );
}
// Check 2: Verify hash count matches chunk count
if ( result.chunk_hashes_size() != subTask.chunkstoprocess_size() )
{
m_logger->error( "WRONG_RESULT_HASHES_LENGTH {}: {} {}",
subTask.subtaskid(),
result.chunk_hashes_size(),
subTask.chunkstoprocess_size() );
return outcome::failure( Error::WRONG_RESULT_HASHES_LENGTH );
}
// Check 3: Verify no duplicate hashes
std::unordered_set<std::string> encounteredHashes;
for ( int chunkIdx = 0; chunkIdx < result.chunk_hashes_size(); ++chunkIdx )
{
const std::string &chunkHash = result.chunk_hashes( chunkIdx );
if ( !encounteredHashes.insert( chunkHash ).second )
{
const auto &chunk = subTask.chunkstoprocess( chunkIdx );
m_logger->error( "DUPLICATE_CHUNK_RESULT_HASH [{}, {}]", subTask.subtaskid(), chunk.chunkid() );
return outcome::failure( Error::DUPLICATE_CHUNK_RESULT_HASH );
}
// Check 4: Verify hash is not empty
if ( chunkHash.empty() )
{
const auto &chunk = subTask.chunkstoprocess( chunkIdx );
m_logger->error( "EMPTY_CHUNK_RESULT_HASH [{}, {}]", subTask.subtaskid(), chunk.chunkid() );
return outcome::failure( Error::EMPTY_CHUNK_RESULT_HASH );
}
}
return outcome::success();
}
outcome::result<void> ProcessingValidationCore::CheckSubTaskResultHashes(
const SGProcessing::SubTask &subTask,
const std::map<std::string, std::vector<uint8_t>> &chunks ) const
{
std::unordered_set<std::string> encounteredHashes;
for ( int chunkIdx = 0; chunkIdx < subTask.chunkstoprocess_size(); ++chunkIdx )
{
const auto &chunk = subTask.chunkstoprocess( chunkIdx );
auto it = chunks.find( chunk.SerializeAsString() );
if ( it != chunks.end() )
{
std::string chunkHash( it->second.begin(), it->second.end() );
if ( !encounteredHashes.insert( chunkHash ).second )
{
m_logger->error( "INVALID_CHUNK_RESULT_HASH [{}, {}]", subTask.subtaskid(), chunk.chunkid() );
return outcome::failure( Error::INVALID_CHUNK_RESULT_HASH );
}
}
else
{
m_logger->error( "NO_CHUNK_RESULT_FOUND [{}, {}]", subTask.subtaskid(), chunk.chunkid() );
return outcome::failure( Error::MISSING_CHUNK_RESULT );
}
}
return outcome::success();
}
}
Updated on 2026-03-04 at 13:10:44 -0800