Skip to content

processing_dapp/processing_dapp.cpp

Functions

Name
int main(int argc, char * argv[])

Functions Documentation

function main

int main(
    int argc,
    char * argv[]
)

Source code

#include "processing/impl/processing_task_queue_impl.hpp"

#include "crdt/globaldb/globaldb.hpp"
#include "crdt/globaldb/keypair_file_storage.hpp"

#include <libp2p/basic/scheduler/scheduler_impl.hpp>
#include <libp2p/multi/multibase_codec/multibase_codec_impl.hpp>
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/logger.hpp>

#include <boost/program_options.hpp>
#include <boost/format.hpp>

#include <iostream>
#include <ipfs_lite/ipfs/graphsync/impl/network/network.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/local_requests.hpp>
#include <libp2p/basic/scheduler/asio_scheduler_backend.hpp>

using namespace sgns::processing;

namespace
{
    class TaskSplitter
    {
    public:
        TaskSplitter( size_t nSubTasks, size_t nChunks, bool addValidationSubtask ) :
            m_nSubTasks( nSubTasks ), m_nChunks( nChunks ), m_addValidationSubtask( addValidationSubtask )
        {
        }

        void SplitTask( const SGProcessing::Task &task, std::list<SGProcessing::SubTask> &subTasks )
        {
            std::optional<SGProcessing::SubTask> validationSubtask;
            if ( m_addValidationSubtask )
            {
                validationSubtask = SGProcessing::SubTask();
            }

            size_t chunkId = 0;
            for ( size_t i = 0; i < m_nSubTasks; ++i )
            {
                auto                  subtaskId = ( boost::format( "subtask_%d" ) % i ).str();
                SGProcessing::SubTask subtask;
                subtask.set_ipfsblock( task.ipfs_block_id() );
                subtask.set_subtaskid( subtaskId );

                for ( size_t chunkIdx = 0; chunkIdx < m_nChunks; ++chunkIdx )
                {
                    SGProcessing::ProcessingChunk chunk;
                    chunk.set_chunkid( ( boost::format( "CHUNK_%d_%d" ) % i % chunkId ).str() );
                    chunk.set_n_subchunks( 1 );
                    //chunk.set_line_stride(1);
                    //chunk.set_offset(0);
                    //chunk.set_stride(1);
                    //chunk.set_subchunk_height(10);
                    //chunk.set_subchunk_width(10);

                    auto chunkToProcess = subtask.add_chunkstoprocess();
                    chunkToProcess->CopyFrom( chunk );

                    if ( validationSubtask )
                    {
                        if ( chunkIdx == 0 )
                        {
                            // Add the first chunk of a processing subtask into the validation subtask
                            auto chunkToValidate = validationSubtask->add_chunkstoprocess();
                            chunkToValidate->CopyFrom( chunk );
                        }
                    }

                    ++chunkId;
                }
                subTasks.push_back( std::move( subtask ) );
            }

            if ( validationSubtask )
            {
                auto subtaskId = ( boost::format( "subtask_validation" ) ).str();
                validationSubtask->set_ipfsblock( task.ipfs_block_id() );
                validationSubtask->set_subtaskid( subtaskId );
                subTasks.push_back( std::move( *validationSubtask ) );
            }
        }

    private:
        size_t m_nSubTasks;
        size_t m_nChunks;
        bool   m_addValidationSubtask;
    };

    // cmd line options
    struct Options
    {
        // optional remote peer to connect to
        std::optional<std::string> remote;
        size_t                     nTasks               = 1;
        size_t                     nSubTasks            = 5;
        size_t                     nChunks              = 1;
        bool                       addValidationSubtask = false;
    };

    boost::optional<Options> parseCommandLine( int argc, char **argv )
    {
        namespace po = boost::program_options;
        try
        {
            Options     o;
            std::string remote;

            po::options_description desc( "processing service options" );
            desc.add_options()( "help,h", "print usage message" )( "remote,r",
                                                                   po::value( &remote ),
                                                                   "remote service multiaddress to connect to" )(
                "ntasks,t",
                po::value( &o.nTasks ),
                "number of tasks to process" )( "nsubtasks,n",
                                                po::value( &o.nSubTasks ),
                                                "number of subtasks that task is split to" )(
                "addvalidationsubtask,v",
                po::value( &o.addValidationSubtask ),
                "add a subtask that contains a randon (actually first) chunk of each of processing subtasks" )(
                "nchunks,c",
                po::value( &o.nChunks ),
                "number of chunks in each processing subtask" );

            po::variables_map vm;
            po::store( parse_command_line( argc, argv, desc ), vm );
            po::notify( vm );

            if ( vm.count( "help" ) != 0 || argc == 1 )
            {
                std::cerr << desc << "\n";
                return boost::none;
            }

            if ( !remote.empty() )
            {
                o.remote = remote;
            }

            return o;
        }
        catch ( const std::exception &e )
        {
            std::cerr << e.what() << std::endl;
        }
        return boost::none;
    }

}

int main( int argc, char *argv[] )
{
    auto options = parseCommandLine( argc, argv );
    if ( !options )
    {
        return 1;
    }

    const std::string logger_config( R"(
    # ----------------
    sinks:
      - name: console
        type: console
        color: true
    groups:
      - name: processing_dapp
        sink: console
        level: info
        children:
          - name: libp2p
          - name: Gossip
    # ----------------
    )" );

    // prepare log system
    auto logging_system = std::make_shared<soralog::LoggingSystem>( std::make_shared<soralog::ConfiguratorFromYAML>(
        // Original LibP2P logging config
        std::make_shared<libp2p::log::Configurator>(),
        // Additional logging config for application
        logger_config ) );
    logging_system->configure();

    libp2p::log::setLoggingSystem( logging_system );

    auto loggerPubSub = sgns::base::createLogger( "GossipPubSub" );
    //loggerPubSub->set_level(spdlog::level::trace);

    auto loggerProcessingEngine = sgns::base::createLogger( "ProcessingEngine" );
    loggerProcessingEngine->set_level( spdlog::level::trace );

    auto loggerProcessingService = sgns::base::createLogger( "ProcessingService" );
    loggerProcessingService->set_level( spdlog::level::trace );

    auto loggerProcessingQueueManager = sgns::base::createLogger( "ProcessingSubTaskQueueManager" );
    loggerProcessingQueueManager->set_level( spdlog::level::debug );

    auto loggerGlobalDB = sgns::base::createLogger( "GlobalDB" );
    loggerGlobalDB->set_level( spdlog::level::debug );

    auto loggerDAGSyncer = sgns::base::createLogger( "GraphsyncDAGSyncer" );
    loggerDAGSyncer->set_level( spdlog::level::trace );

    auto loggerBroadcaster = sgns::base::createLogger( "PubSubBroadcasterExt" );
    loggerBroadcaster->set_level( spdlog::level::debug );

    const std::string processingGridChannel = "GRID_CHANNEL_ID";

    auto pubs = std::make_shared<sgns::ipfs_pubsub::GossipPubSub>(
        sgns::crdt::KeyPairFileStorage( "CRDT.Datastore.TEST/pubs_dapp" ).GetKeyPair().value() );

    std::vector<std::string> pubsubBootstrapPeers;
    if ( options->remote )
    {
        pubsubBootstrapPeers = std::vector( { *options->remote } );
    }
    pubs->Start( 40001, pubsubBootstrapPeers );

    const size_t maximalNodesCount = 1;

    std::list<SGProcessing::Task> tasks;

    // Put tasks to Global DB
    for ( size_t taskIdx = 0; taskIdx < options->nTasks; ++taskIdx )
    {
        // And wait for its processing
        SGProcessing::Task task;
        task.set_ipfs_block_id( ( boost::format( "IPFS_BLOCK_ID_%1%" ) % ( taskIdx + 1 ) ).str() );
        //task.set_block_len(1000);
        //task.set_block_line_stride(2);
        //task.set_block_stride(4);
        task.set_random_seed( 0 );
        task.set_results_channel( ( boost::format( "RESULT_CHANNEL_ID_%1%" ) % ( taskIdx + 1 ) ).str() );
        tasks.push_back( std::move( task ) );
    }

    auto io          = std::make_shared<boost::asio::io_context>();
    auto crdtOptions = sgns::crdt::CrdtOptions::DefaultOptions();
    auto scheduler   = std::make_shared<libp2p::basic::SchedulerImpl>( std::make_shared<libp2p::basic::AsioSchedulerBackend>( io ), libp2p::basic::Scheduler::Config{ std::chrono::milliseconds( 100 ) } );
    auto graphsyncnetwork = std::make_shared<sgns::ipfs_lite::ipfs::graphsync::Network>( pubs->GetHost(), scheduler );
    auto generator        = std::make_shared<sgns::ipfs_lite::ipfs::graphsync::RequestIdGenerator>();

    auto globaldb_ret = sgns::crdt::GlobalDB::New( io,
                                                   "CRDT.Datastore.TEST",
                                                   pubs,
                                                   crdtOptions,
                                                   graphsyncnetwork,
                                                   scheduler,
                                                   generator );

    if (globaldb_ret.has_error())
    {
        return -1;
    }
    auto globalDB     = std::move( globaldb_ret.value() );

    globalDB->AddListenTopic( "CRDT.Datastore.TEST.Channel" );
    globalDB->AddBroadcastTopic( "CRDT.Datastore.TEST.Channel" );
    globalDB->Start();

    std::thread iothread( [io]() { io->run(); } );

    auto taskQueue = std::make_shared<ProcessingTaskQueueImpl>( globalDB, "test" );

    TaskSplitter taskSplitter( options->nSubTasks, options->nChunks, options->addValidationSubtask );

    for ( auto &task : tasks )
    {
        std::list<SGProcessing::SubTask> subTasks;
        taskSplitter.SplitTask( task, subTasks );
        taskQueue->EnqueueTask( task, subTasks );
    }

    // Gracefully shutdown on signal
    boost::asio::signal_set signals( *pubs->GetAsioContext(), SIGINT, SIGTERM );
    signals.async_wait(
        [&pubs, &io]( const boost::system::error_code &, int )
        {
            pubs->Stop();
            io->stop();
        } );

    pubs->Wait();
    iothread.join();

    return 0;
}

Updated on 2026-04-13 at 23:22:46 -0700