processing_dapp/processing_dapp.cpp¶
Functions¶
| Name | |
|---|---|
| int | main(int argc, char * argv[]) |
Functions Documentation¶
function main¶
Source code¶
#include "processing/impl/processing_task_queue_impl.hpp"
#include "crdt/globaldb/globaldb.hpp"
#include "crdt/globaldb/keypair_file_storage.hpp"
#include <libp2p/basic/scheduler/scheduler_impl.hpp>
#include <libp2p/multi/multibase_codec/multibase_codec_impl.hpp>
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/logger.hpp>
#include <boost/program_options.hpp>
#include <boost/format.hpp>
#include <iostream>
#include <ipfs_lite/ipfs/graphsync/impl/network/network.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/local_requests.hpp>
#include <libp2p/basic/scheduler/asio_scheduler_backend.hpp>
using namespace sgns::processing;
namespace
{
class TaskSplitter
{
public:
TaskSplitter( size_t nSubTasks, size_t nChunks, bool addValidationSubtask ) :
m_nSubTasks( nSubTasks ), m_nChunks( nChunks ), m_addValidationSubtask( addValidationSubtask )
{
}
void SplitTask( const SGProcessing::Task &task, std::list<SGProcessing::SubTask> &subTasks )
{
std::optional<SGProcessing::SubTask> validationSubtask;
if ( m_addValidationSubtask )
{
validationSubtask = SGProcessing::SubTask();
}
size_t chunkId = 0;
for ( size_t i = 0; i < m_nSubTasks; ++i )
{
auto subtaskId = ( boost::format( "subtask_%d" ) % i ).str();
SGProcessing::SubTask subtask;
subtask.set_ipfsblock( task.ipfs_block_id() );
subtask.set_subtaskid( subtaskId );
for ( size_t chunkIdx = 0; chunkIdx < m_nChunks; ++chunkIdx )
{
SGProcessing::ProcessingChunk chunk;
chunk.set_chunkid( ( boost::format( "CHUNK_%d_%d" ) % i % chunkId ).str() );
chunk.set_n_subchunks( 1 );
//chunk.set_line_stride(1);
//chunk.set_offset(0);
//chunk.set_stride(1);
//chunk.set_subchunk_height(10);
//chunk.set_subchunk_width(10);
auto chunkToProcess = subtask.add_chunkstoprocess();
chunkToProcess->CopyFrom( chunk );
if ( validationSubtask )
{
if ( chunkIdx == 0 )
{
// Add the first chunk of a processing subtask into the validation subtask
auto chunkToValidate = validationSubtask->add_chunkstoprocess();
chunkToValidate->CopyFrom( chunk );
}
}
++chunkId;
}
subTasks.push_back( std::move( subtask ) );
}
if ( validationSubtask )
{
auto subtaskId = ( boost::format( "subtask_validation" ) ).str();
validationSubtask->set_ipfsblock( task.ipfs_block_id() );
validationSubtask->set_subtaskid( subtaskId );
subTasks.push_back( std::move( *validationSubtask ) );
}
}
private:
size_t m_nSubTasks;
size_t m_nChunks;
bool m_addValidationSubtask;
};
// cmd line options
struct Options
{
// optional remote peer to connect to
std::optional<std::string> remote;
size_t nTasks = 1;
size_t nSubTasks = 5;
size_t nChunks = 1;
bool addValidationSubtask = false;
};
boost::optional<Options> parseCommandLine( int argc, char **argv )
{
namespace po = boost::program_options;
try
{
Options o;
std::string remote;
po::options_description desc( "processing service options" );
desc.add_options()( "help,h", "print usage message" )( "remote,r",
po::value( &remote ),
"remote service multiaddress to connect to" )(
"ntasks,t",
po::value( &o.nTasks ),
"number of tasks to process" )( "nsubtasks,n",
po::value( &o.nSubTasks ),
"number of subtasks that task is split to" )(
"addvalidationsubtask,v",
po::value( &o.addValidationSubtask ),
"add a subtask that contains a randon (actually first) chunk of each of processing subtasks" )(
"nchunks,c",
po::value( &o.nChunks ),
"number of chunks in each processing subtask" );
po::variables_map vm;
po::store( parse_command_line( argc, argv, desc ), vm );
po::notify( vm );
if ( vm.count( "help" ) != 0 || argc == 1 )
{
std::cerr << desc << "\n";
return boost::none;
}
if ( !remote.empty() )
{
o.remote = remote;
}
return o;
}
catch ( const std::exception &e )
{
std::cerr << e.what() << std::endl;
}
return boost::none;
}
}
int main( int argc, char *argv[] )
{
auto options = parseCommandLine( argc, argv );
if ( !options )
{
return 1;
}
const std::string logger_config( R"(
# ----------------
sinks:
- name: console
type: console
color: true
groups:
- name: processing_dapp
sink: console
level: info
children:
- name: libp2p
- name: Gossip
# ----------------
)" );
// prepare log system
auto logging_system = std::make_shared<soralog::LoggingSystem>( std::make_shared<soralog::ConfiguratorFromYAML>(
// Original LibP2P logging config
std::make_shared<libp2p::log::Configurator>(),
// Additional logging config for application
logger_config ) );
logging_system->configure();
libp2p::log::setLoggingSystem( logging_system );
auto loggerPubSub = sgns::base::createLogger( "GossipPubSub" );
//loggerPubSub->set_level(spdlog::level::trace);
auto loggerProcessingEngine = sgns::base::createLogger( "ProcessingEngine" );
loggerProcessingEngine->set_level( spdlog::level::trace );
auto loggerProcessingService = sgns::base::createLogger( "ProcessingService" );
loggerProcessingService->set_level( spdlog::level::trace );
auto loggerProcessingQueueManager = sgns::base::createLogger( "ProcessingSubTaskQueueManager" );
loggerProcessingQueueManager->set_level( spdlog::level::debug );
auto loggerGlobalDB = sgns::base::createLogger( "GlobalDB" );
loggerGlobalDB->set_level( spdlog::level::debug );
auto loggerDAGSyncer = sgns::base::createLogger( "GraphsyncDAGSyncer" );
loggerDAGSyncer->set_level( spdlog::level::trace );
auto loggerBroadcaster = sgns::base::createLogger( "PubSubBroadcasterExt" );
loggerBroadcaster->set_level( spdlog::level::debug );
const std::string processingGridChannel = "GRID_CHANNEL_ID";
auto pubs = std::make_shared<sgns::ipfs_pubsub::GossipPubSub>(
sgns::crdt::KeyPairFileStorage( "CRDT.Datastore.TEST/pubs_dapp" ).GetKeyPair().value() );
std::vector<std::string> pubsubBootstrapPeers;
if ( options->remote )
{
pubsubBootstrapPeers = std::vector( { *options->remote } );
}
pubs->Start( 40001, pubsubBootstrapPeers );
const size_t maximalNodesCount = 1;
std::list<SGProcessing::Task> tasks;
// Put tasks to Global DB
for ( size_t taskIdx = 0; taskIdx < options->nTasks; ++taskIdx )
{
// And wait for its processing
SGProcessing::Task task;
task.set_ipfs_block_id( ( boost::format( "IPFS_BLOCK_ID_%1%" ) % ( taskIdx + 1 ) ).str() );
//task.set_block_len(1000);
//task.set_block_line_stride(2);
//task.set_block_stride(4);
task.set_random_seed( 0 );
task.set_results_channel( ( boost::format( "RESULT_CHANNEL_ID_%1%" ) % ( taskIdx + 1 ) ).str() );
tasks.push_back( std::move( task ) );
}
auto io = std::make_shared<boost::asio::io_context>();
auto crdtOptions = sgns::crdt::CrdtOptions::DefaultOptions();
auto scheduler = std::make_shared<libp2p::basic::SchedulerImpl>( std::make_shared<libp2p::basic::AsioSchedulerBackend>( io ), libp2p::basic::Scheduler::Config{ std::chrono::milliseconds( 100 ) } );
auto graphsyncnetwork = std::make_shared<sgns::ipfs_lite::ipfs::graphsync::Network>( pubs->GetHost(), scheduler );
auto generator = std::make_shared<sgns::ipfs_lite::ipfs::graphsync::RequestIdGenerator>();
auto globaldb_ret = sgns::crdt::GlobalDB::New( io,
"CRDT.Datastore.TEST",
pubs,
crdtOptions,
graphsyncnetwork,
scheduler,
generator );
if (globaldb_ret.has_error())
{
return -1;
}
auto globalDB = std::move( globaldb_ret.value() );
globalDB->AddListenTopic( "CRDT.Datastore.TEST.Channel" );
globalDB->AddBroadcastTopic( "CRDT.Datastore.TEST.Channel" );
globalDB->Start();
std::thread iothread( [io]() { io->run(); } );
auto taskQueue = std::make_shared<ProcessingTaskQueueImpl>( globalDB, "test" );
TaskSplitter taskSplitter( options->nSubTasks, options->nChunks, options->addValidationSubtask );
for ( auto &task : tasks )
{
std::list<SGProcessing::SubTask> subTasks;
taskSplitter.SplitTask( task, subTasks );
taskQueue->EnqueueTask( task, subTasks );
}
// Gracefully shutdown on signal
boost::asio::signal_set signals( *pubs->GetAsioContext(), SIGINT, SIGTERM );
signals.async_wait(
[&pubs, &io]( const boost::system::error_code &, int )
{
pubs->Stop();
io->stop();
} );
pubs->Wait();
iothread.join();
return 0;
}
Updated on 2026-04-13 at 23:22:46 -0700