processing_room/processing_app.cpp¶
Functions¶
| Name | |
|---|---|
| int | main(int argc, char * argv[]) |
Functions Documentation¶
function main¶
Source code¶
#include "processing/processing_service.hpp"
#include "processing/processing_subtask_enqueuer_impl.hpp"
#include <iostream>
#include <thread>
#include <boost/program_options.hpp>
#include <boost/format.hpp>
#include <libp2p/multi/multibase_codec/multibase_codec_impl.hpp>
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/logger.hpp>
using namespace sgns::processing;
namespace
{
class SubTaskResultStorageImpl : public SubTaskResultStorage
{
public:
void AddSubTaskResult( const SGProcessing::SubTaskResult &subTaskResult ) override {}
void RemoveSubTaskResult( const std::string &subTaskId ) override {}
std::vector<SGProcessing::SubTaskResult> GetSubTaskResults( const std::set<std::string> &subTaskIds ) override
{
return {};
}
};
class TaskSplitter
{
public:
TaskSplitter( size_t nSubtasks ) : m_nSubtasks( nSubtasks ) {}
void SplitTask( const SGProcessing::Task &task, std::list<SGProcessing::SubTask> &subTasks )
{
for ( size_t i = 0; i < m_nSubtasks; ++i )
{
SGProcessing::SubTask subtask;
subtask.set_ipfsblock( task.ipfs_block_id() );
subtask.set_subtaskid( ( boost::format( "%s_subtask_%d" ) % task.results_channel() % i ).str() );
subTasks.push_back( std::move( subtask ) );
}
}
private:
size_t m_nSubtasks;
};
class ProcessingCoreImpl : public ProcessingCore
{
public:
ProcessingCoreImpl( size_t subTaskProcessingTime ) : m_subTaskProcessingTime( subTaskProcessingTime ) {}
outcome::result<SGProcessing::SubTaskResult> ProcessSubTask( const SGProcessing::SubTask &subTask,
uint32_t initialHashCode ) override
{
SGProcessing::SubTaskResult result;
std::cout << "SubTask processing started. " << subTask.subtaskid() << std::endl;
std::this_thread::sleep_for( std::chrono::milliseconds( m_subTaskProcessingTime ) );
std::cout << "SubTask processed. " << subTask.subtaskid() << std::endl;
result.set_ipfs_results_data_id( ( boost::format( "%s_%s" ) % "RESULT_IPFS" % subTask.subtaskid() ).str() );
return result;
}
private:
size_t m_subTaskProcessingTime;
};
class ProcessingTaskQueueImpl : public ProcessingTaskQueue
{
public:
ProcessingTaskQueueImpl() {}
outcome::result<void> EnqueueTask( const SGProcessing::Task &task,
const std::list<SGProcessing::SubTask> &subTasks ) override
{
m_tasks.push_back( task );
m_subTasks.emplace( task.ipfs_block_id(), subTasks );
return outcome::success();
}
bool GetSubTasks( const std::string &taskId, std::list<SGProcessing::SubTask> &subTasks ) override
{
auto it = m_subTasks.find( taskId );
if ( it != m_subTasks.end() )
{
subTasks = it->second;
return true;
}
return false;
}
outcome::result<std::pair<std::string, SGProcessing::Task>> GrabTask() override
{
if ( m_tasks.empty() )
{
return outcome::failure( boost::system::error_code{} );
}
SGProcessing::Task task;
task = std::move( m_tasks.back() );
m_tasks.pop_back();
std::string taskKey = ( boost::format( "TASK_%d" ) % m_tasks.size() ).str();
return std::make_pair( taskKey, task );
}
bool IsTaskCompleted( const std::string &taskId ) override
{
return true;
}
outcome::result<std::shared_ptr<crdt::AtomicTransaction>> CompleteTask( const std::string &taskKey, const SGProcessing::TaskResult &task ) override
{
return outcome::success();
}
void MarkTaskBad(const std::string& taskKey) override
{
return;
}
private:
std::list<SGProcessing::Task> m_tasks;
std::map<std::string, std::list<SGProcessing::SubTask>> m_subTasks;
};
// cmd line options
struct Options
{
size_t serviceIndex = 0;
size_t subTaskProcessingTime = 0; // ms
size_t disconnect = 0;
size_t nSubTasks = 5;
size_t channelListRequestTimeout = 5000;
// optional remote peer to connect to
std::optional<std::string> remote;
};
boost::optional<Options> parseCommandLine( int argc, char **argv )
{
namespace po = boost::program_options;
try
{
Options o;
std::string remote;
po::options_description desc( "processing service options" );
desc.add_options()( "help,h", "print usage message" )( "remote,r",
po::value( &remote ),
"remote service multiaddress to connect to" )(
"processingtime,p",
po::value( &o.subTaskProcessingTime ),
"subtask processing time (ms)" )( "disconnect,d", po::value( &o.disconnect ), "disconnect after (ms)" )(
"nsubtasks,n",
po::value( &o.nSubTasks ),
"number of subtasks that task is split to" )( "channellisttimeout,t",
po::value( &o.channelListRequestTimeout ),
"chnnel list request timeout (ms)" )(
"serviceindex,i",
po::value( &o.serviceIndex ),
"index of the service in computational grid (has to be a unique value)" );
po::variables_map vm;
po::store( parse_command_line( argc, argv, desc ), vm );
po::notify( vm );
if ( vm.count( "help" ) != 0 || argc == 1 )
{
std::cerr << desc << "\n";
return boost::none;
}
if ( o.serviceIndex == 0 )
{
std::cerr << "Service index should be > 0\n";
return boost::none;
}
if ( o.subTaskProcessingTime == 0 )
{
std::cerr << "SubTask processing time should be > 0\n";
return boost::none;
}
if ( !remote.empty() )
{
o.remote = remote;
}
return o;
}
catch ( const std::exception &e )
{
std::cerr << e.what() << std::endl;
}
return boost::none;
}
}
int main( int argc, char *argv[] )
{
auto options = parseCommandLine( argc, argv );
if ( !options )
{
return 1;
}
auto loggerPubSub = sgns::base::createLogger( "GossipPubSub" );
//loggerPubSub->set_level(spdlog::level::trace);
auto loggerProcessingEngine = sgns::base::createLogger( "ProcessingEngine" );
loggerProcessingEngine->set_level( spdlog::level::trace );
auto loggerProcessingService = sgns::base::createLogger( "ProcessingService" );
loggerProcessingService->set_level( spdlog::level::trace );
auto loggerProcessingQueueManager = sgns::base::createLogger( "ProcessingSubTaskQueueManager" );
loggerProcessingQueueManager->set_level( spdlog::level::debug );
const std::string processingGridChannel = "GRID_CHANNEL_ID";
const std::string logger_config( R"(
# ----------------
sinks:
- name: console
type: console
color: true
groups:
- name: processing_app
sink: console
level: info
children:
- name: libp2p
- name: Gossip
# ----------------
)" );
// prepare log system
auto logging_system = std::make_shared<soralog::LoggingSystem>( std::make_shared<soralog::ConfiguratorFromYAML>(
// Original LibP2P logging config
std::make_shared<libp2p::log::Configurator>(),
// Additional logging config for application
logger_config ) );
logging_system->configure();
libp2p::log::setLoggingSystem( logging_system );
auto pubs = std::make_shared<sgns::ipfs_pubsub::GossipPubSub>();
if ( options->serviceIndex == 1 )
{
std::string publicKey = "z5b3BTS9wEgJxi9E8NHH6DT8Pj9xTmxBRgTaRUpBVox9a";
std::string privateKey = "zGRXH26ag4k9jxTGXp2cg8n31CEkR2HN1SbHaKjaHnFTu";
libp2p::crypto::KeyPair keyPair;
auto codec = libp2p::multi::MultibaseCodecImpl();
keyPair.publicKey = { libp2p::crypto::PublicKey::Type::Ed25519, codec.decode( publicKey ).value() };
keyPair.privateKey = { libp2p::crypto::PublicKey::Type::Ed25519, codec.decode( privateKey ).value() };
pubs = std::make_shared<sgns::ipfs_pubsub::GossipPubSub>( keyPair );
}
if ( options->remote )
{
pubs->Start( 40001, { *options->remote } );
}
else
{
pubs->Start( 40001, {} );
}
const size_t maximalNodesCount = 1;
std::list<SGProcessing::Task> tasks;
// Only a service with index equal to 1 has a locked task (job)
if ( options->serviceIndex == 1 )
{
SGProcessing::Task task;
task.set_ipfs_block_id( "IPFS_BLOCK_ID_1" );
//task.set_block_len(1000);
//task.set_block_line_stride(2);
//task.set_block_stride(4);
task.set_random_seed( 0 );
task.set_results_channel( "RESULT_CHANNEL_ID_1" );
tasks.push_back( std::move( task ) );
}
boost::asio::deadline_timer timerToDisconnect( *pubs->GetAsioContext() );
if ( options->disconnect > 0 )
{
timerToDisconnect.expires_from_now( boost::posix_time::milliseconds( options->disconnect ) );
timerToDisconnect.async_wait(
[pubs, &timerToDisconnect]( const boost::system::error_code &error )
{
timerToDisconnect.expires_at( boost::posix_time::pos_infin );
pubs->Stop();
} );
}
auto taskQueue = std::make_shared<ProcessingTaskQueueImpl>();
auto taskSplitter = std::make_shared<TaskSplitter>( options->nSubTasks );
for ( auto &task : tasks )
{
std::list<SGProcessing::SubTask> subTasks;
taskSplitter->SplitTask( task, subTasks );
taskQueue->EnqueueTask( task, subTasks );
}
auto processingCore = std::make_shared<ProcessingCoreImpl>( options->subTaskProcessingTime );
auto enqueuer = std::make_shared<SubTaskEnqueuerImpl>( taskQueue );
ProcessingServiceImpl processingService( pubs,
maximalNodesCount,
enqueuer,
std::make_shared<SubTaskResultStorageImpl>(),
processingCore );
processingService.SetChannelListRequestTimeout(
boost::posix_time::milliseconds( options->channelListRequestTimeout ) );
processingService.StartProcessing( processingGridChannel );
// Gracefully shutdown on signal
boost::asio::signal_set signals( *pubs->GetAsioContext(), SIGINT, SIGTERM );
signals.async_wait( [&pubs]( const boost::system::error_code &, int ) { pubs->Stop(); } );
pubs->Wait();
return 0;
}
Updated on 2026-04-13 at 23:22:46 -0700