Skip to content

graphsync_app/graphsync_app.cpp

Classes

Name
class Node
struct NodeParams

Functions

Name
void RunIpfsServer()
void RunIpfsClient(std::string remote)
void testManyNodesExchange(size_t N, size_t n_data)
int main(int argc, char * argv[])

Attributes

Name
std::shared_ptr< spdlog::logger > logger
size_t run_time_msec

Functions Documentation

function RunIpfsServer

void RunIpfsServer()

function RunIpfsClient

void RunIpfsClient(
    std::string remote
)

function testManyNodesExchange

void testManyNodesExchange(
    size_t N,
    size_t n_data
)

function main

int main(
    int argc,
    char * argv[]
)

Attributes Documentation

variable logger

static std::shared_ptr< spdlog::logger > logger;

variable run_time_msec

static size_t run_time_msec = 0;

Source code

#include <common/logger.hpp>
#include "graphsync_acceptance_common.hpp"
#include <boost/program_options.hpp>
#include <libp2p/multi/multibase_codec/multibase_codec_impl.hpp>
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/logger.hpp>

#include <iostream>

// logger used by these tests
static std::shared_ptr<spdlog::logger> logger;

// max test case run time, always limited by various params
static size_t run_time_msec = 0;

// Test node aggregate
class Node {
public:
    // total requests sent by all nodes in a test case
    static size_t requests_sent;

    // total responses received by all nodes in a test case
    static size_t responses_received;

    // n_responses_expected: count of responses received by the node after which
    // io->stop() is called
    Node(std::shared_ptr<boost::asio::io_context> io,
        std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::MerkleDagBridge> data_service,
        sgns::ipfs_lite::ipfs::graphsync::Graphsync::BlockCallback cb,
        size_t n_responses_expected,
        std::optional<libp2p::crypto::KeyPair> keyPair)
        : io_(std::move(io)),
        data_service_(std::move(data_service)),
        block_cb_(std::move(cb)),
        n_responses_expected_(n_responses_expected) 
    {
        if (keyPair)
            std::tie(graphsync_, host_) = createNodeObjects(io_, keyPair.value());
        else
            std::tie(graphsync_, host_) = createNodeObjects(io_);
    }

    // stops graphsync and host, otherwise they can interact with further tests!
    void stop() {
        graphsync_->stop();
        host_->stop();
    }

    // returns peer ID, so they can connect to each other
    auto getId() const {
        return host_->getId();
    }

    // listens to network and starts nodes if not yet started
    void listen(const libp2p::multi::Multiaddress& listen_to) {
        auto listen_res = host_->listen(listen_to);
        if (!listen_res) {
            logger->trace("Cannot listen to multiaddress {}, {}",
                listen_to.getStringAddress(),
                listen_res.error().message());
            return;
        }
        start();
    }

    // calls Graphsync's makeRequest
    void makeRequest(const libp2p::peer::PeerId& peer,
        boost::optional<libp2p::multi::Multiaddress> address,
        const sgns::CID& root_cid) {
        start();

        std::vector<sgns::ipfs_lite::ipfs::graphsync::Extension> extensions;
        sgns::ipfs_lite::ipfs::graphsync::ResponseMetadata response_metadata{};
        sgns::ipfs_lite::ipfs::graphsync::Extension response_metadata_extension =
            sgns::ipfs_lite::ipfs::graphsync::encodeResponseMetadata(response_metadata);
        extensions.push_back(response_metadata_extension);
        std::vector<sgns::CID> cids;
        sgns::ipfs_lite::ipfs::graphsync::Extension do_not_send_cids_extension = 
            sgns::ipfs_lite::ipfs::graphsync::encodeDontSendCids(cids);
        extensions.push_back(do_not_send_cids_extension);
        // unused code , request_ is deleted because Subscription have deleted copy-constructor and operator
              // requests_.push_back(graphsync_->makeRequest(peer,
              //                                             std::move(address),
              //                                             root_cid,
              //                                             {},
              //                                             extensions,
              //                                             requestProgressCallback()));
              // Subscription subscription = graphsync_->makeRequest(peer,
              //                                             std::move(address),
              //                                             root_cid,
              //                                             {},
              //                                             extensions,
              //                                             requestProgressCallback());

        boost::optional<std::vector<libp2p::multi::Multiaddress>> addresses;
        if (address) {
            addresses = std::vector<libp2p::multi::Multiaddress>{ *address };
        }
        requests_.push_back(std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::Subscription>(
            new sgns::ipfs_lite::ipfs::graphsync::Subscription(std::move(graphsync_->makeRequest(peer,
            std::move(addresses),
            root_cid,
            {},
            extensions,
            requestProgressCallback())))));

        //-------------------------------------------------------------------------------------

        ++requests_sent;
    }

private:
    void start() {
        if (!started_) {
            graphsync_->start(data_service_, block_cb_);
            host_->start();
            started_ = true;
        }
    }

    // helper, returns requesu callback fn
    sgns::ipfs_lite::ipfs::graphsync::Graphsync::RequestProgressCallback requestProgressCallback() {
        static auto formatExtensions =
            [](const std::vector<sgns::ipfs_lite::ipfs::graphsync::Extension>& extensions) -> std::string {
            std::string s;
            for (const auto& item : extensions) {
                s += fmt::format(
                    "({}: 0x{}) ", item.name, sgns::common::Buffer(item.data).toHex());
            }
            return s;
        };
        return [this](sgns::ipfs_lite::ipfs::graphsync::ResponseStatusCode code,
            const std::vector<sgns::ipfs_lite::ipfs::graphsync::Extension>& extensions) {
                ++responses_received;
                logger->trace("request progress: code={}, extensions={}",
                    sgns::ipfs_lite::ipfs::graphsync::statusCodeToString(code),
                    formatExtensions(extensions));
                if (++n_responses == n_responses_expected_) {
                    io_->stop();
                }
        };
    }

    // asion context to be stopped when needed
    std::shared_ptr<boost::asio::io_context> io_;

    std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::Graphsync> graphsync_;

    std::shared_ptr<libp2p::Host> host_;

    std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::MerkleDagBridge> data_service_;

    sgns::ipfs_lite::ipfs::graphsync::Graphsync::BlockCallback block_cb_;

    // keeping subscriptions alive, otherwise they cancel themselves
    // class Subscription have non-copyable constructor and operator, so it can not be used in std::vector
    // std::vector<Subscription> requests_;

    std::vector<std::shared_ptr<sgns::ipfs_lite::ipfs::graphsync::Subscription >> requests_;

    size_t n_responses_expected_;
    size_t n_responses = 0;
    bool started_ = false;
};

size_t Node::requests_sent = 0;
size_t Node::responses_received = 0;

namespace
{
    boost::optional<libp2p::peer::PeerInfo> PeerInfoFromString(const std::string& str)
    {
        auto server_ma_res = libp2p::multi::Multiaddress::create(str);
        if (!server_ma_res)
        {
            return boost::none;
        }
        auto server_ma = std::move(server_ma_res.value());

        auto server_peer_id_str = server_ma.getPeerId();
        if (!server_peer_id_str)
        {
            return boost::none;
        }

        auto server_peer_id_res = libp2p::peer::PeerId::fromBase58(*server_peer_id_str);
        if (!server_peer_id_res)
        {
            return boost::none;
        }

        return libp2p::peer::PeerInfo{ server_peer_id_res.value(), {server_ma} };
    }
}

// Two nodes interact, one connection is utilized
void RunIpfsServer()
{
    auto io_server = std::make_shared<boost::asio::io_context>();
    // strings from which we create blocks and CIDs
    std::vector<std::string> strings({ "xxx", "yyy", "zzz" });

    size_t unexpected = 0;
    // creating instances
    auto server_data = std::make_shared<TestDataService>();

    // server block callback expects no blocks
    auto server_cb = [&unexpected](sgns::CID, sgns::common::Buffer) { ++unexpected; };

    for (const auto& s : strings) {
        // client expects what server has

        server_data->addData(s);
    }

    std::string publicKey = "z5b3BTS9wEgJxi9E8NHH6DT8Pj9xTmxBRgTaRUpBVox9a";
    std::string privateKey = "zGRXH26ag4k9jxTGXp2cg8n31CEkR2HN1SbHaKjaHnFTu";

    libp2p::crypto::KeyPair keyPair;
    auto codec = libp2p::multi::MultibaseCodecImpl();
    keyPair.publicKey = { libp2p::crypto::PublicKey::Type::Ed25519, codec.decode(publicKey).value() };
    keyPair.privateKey = { libp2p::crypto::PublicKey::Type::Ed25519, codec.decode(privateKey).value() };

    Node server(io_server, server_data, server_cb, 0, keyPair);

    auto listen_to =
        libp2p::multi::Multiaddress::create("/ip4/127.0.0.1/tcp/40000/ipfs/" + server.getId().toBase58()).value();
    logger->debug(listen_to.getStringAddress());

    // starting all the stuff asynchronously
    server.listen(listen_to);

    std::thread t_server([&]() { io_server->run(); });
    //runEventLoop(io_server, run_time_msec);

    t_server.join();

    server.stop();
}

void RunIpfsClient(std::string remote) {
    Node::requests_sent = 0;
    Node::responses_received = 0;

    auto io_client = std::make_shared<boost::asio::io_context>();

    auto client_data = std::make_shared<TestDataService>();

    size_t unexpected = 0;
    // clienc block callback expect 3 blocks from the string above
    auto client_cb = [&client_data, &unexpected](sgns::CID cid, sgns::common::Buffer data) {
        if (!client_data->onDataBlock(std::move(cid), std::move(data))) {
            ++unexpected;
        }
    };

    // strings from which we create blocks and CIDs
    std::vector<std::string> strings({ "xxx", "yyy", "zzz" });
    for (const auto& s : strings) {
        // client expects what server has
        client_data->addExpected(s);
    }

    Node client(io_client, client_data, client_cb, 3, std::nullopt);

    auto listen_to =
        libp2p::multi::Multiaddress::create("/ip4/127.0.0.1/tcp/40000/ipfs/" + client.getId().toBase58()).value();
    client.listen(listen_to);

    io_client->post([&, remote]() {
        // server listens
        auto pi = PeerInfoFromString(remote);
        auto peer = pi.value().id;
        bool use_address = true;

        // client makes 3 requests

        for (const auto& [cid, _] : client_data->getExpected()) {
            boost::optional<libp2p::multi::Multiaddress> address(pi.value().addresses[0]);

            // don't need to pass the address more than once
            client.makeRequest(peer, use_address ? address : boost::none, cid);
            use_address = false;
        }
        });

    std::thread t_client([&]() { io_client->run(); });
    //runEventLoop(io_client, run_time_msec);

    t_client.join();

    client.stop();

    logger->info("total requests sent {}, responses received {}",
        Node::requests_sent,
        Node::responses_received);
}

// Context for more complex cases
struct NodeParams {
    // listen address
    boost::optional<libp2p::multi::Multiaddress> listen_to;

    // MerkleDAG stub for node
    std::shared_ptr<TestDataService> data_service;

    // Strings to make blocks and CIDs from them
    std::vector<std::string> strings;

    // peer ID
    boost::optional<libp2p::peer::PeerId> peer;
};

// N nodes communicate P2P with each other  and collect many blocks.
// Each node has n_data data blocks
void testManyNodesExchange(size_t N, size_t n_data) {
    Node::requests_sent = 0;
    Node::responses_received = 0;

    size_t unexpected_responses = 0;
    size_t total_responses = 0;
    size_t expected = 0;

    // creating parameters for N nodes

    std::vector<NodeParams> params;
    params.reserve(N);
    for (size_t i = 0; i < N; ++i) {
        auto& p = params.emplace_back();

        // The node #i will listen to 40000+i pore
        p.listen_to = libp2p::multi::Multiaddress::create(
            fmt::format("/ip4/127.0.0.1/tcp/{}", 40000 + i))
            .value();

        p.data_service = std::make_shared<TestDataService>();

        // the i-th node has data represented by strings data_i_j, j in[0, n_data)
        p.strings.reserve(n_data);
        for (size_t j = 0; j < n_data; ++j) {
            auto& s = p.strings.emplace_back(fmt::format("data_{}_{}", i, j));
            p.data_service->addData(s);
        }
    }

    auto io = std::make_shared<boost::asio::io_context>();

    // creating N nodes

    std::vector<Node> nodes;
    nodes.reserve(N);

    for (size_t i = 0; i < N; ++i) {
        auto& p = params[i];

        auto cb = [ds = p.data_service,
            &expected,
            &unexpected_responses,
            &total_responses,
            &io](sgns::CID cid, sgns::common::Buffer data) {
            logger->trace("data block received, {}:{}, {}/{}",
                cid.toString().value(),
                std::string((const char*)data.data(), data.size()),
                total_responses + 1,
                expected);
            if (!ds->onDataBlock(std::move(cid), std::move(data))) {
                ++unexpected_responses;
            }
            else if (++total_responses == expected) {
                io->stop();
            }
        };

        auto& n = nodes.emplace_back(io, p.data_service, cb, 0, std::nullopt);

        // peer IDs are known only at this point
        p.peer = n.getId();

        for (size_t j = 0; j < N; ++j) {
            if (j != i) {
                for (const auto& s : params[j].strings)

                    // each node expects data other hodes have
                    p.data_service->addExpected(s);
            }
        }
    }

    // starting N nodes asynchronously

    io->post([&]() {
        for (size_t i = 0; i < N; ++i) {
            auto& p = params[i];
            auto& n = nodes[i];

            // each node listens
            n.listen(p.listen_to.value());
        }

        // will make connections in the next cycle
        io->post([&]() {
            for (size_t i = 0; i < N; ++i) {
                auto& p = params[i];
                auto& n = nodes[i];

                for (const auto& [cid, d] : p.data_service->getExpected()) {
                    ++expected;
                    for (const auto& p0 : params) {
                        if (&p0 != &p) {
                            logger->trace("request from {} to {} for {}:{}",
                                p.peer->toBase58().substr(46),
                                p0.peer->toBase58().substr(46),
                                cid.toString().value(),
                                std::string((const char*)d.data(), d.size()));

                            // each node request every piece of expected data from
                            // all other nodes. And gets RS_FULL_CONTENT 1 time per each
                            // data block,
                            // and respectively RS_NOT_FOUND will come N-2 times per block

                            n.makeRequest(p0.peer.value(), p0.listen_to, cid);
                        }
                    }
                }
            }
            });
        });

    runEventLoop(io, run_time_msec);

    for (auto& n : nodes) {
        n.stop();
    }

    logger->info("total requests sent {}, responses received {}",
        Node::requests_sent,
        Node::responses_received);

}  // namespace sgns::ipfs_lite::ipfs::graphsync::test

namespace
{
    // cmd line options
    struct Options
    {
        // optional remote peer to connect to
        std::optional<std::string> remote;
        std::string mode;
    };

    boost::optional<Options> parseCommandLine(int argc, char** argv) {
        namespace po = boost::program_options;
        try
        {
            Options o;
            std::string remote;

            po::options_description desc("processing service options");
            desc.add_options()("help,h", "print usage message")
                ("mode,m", po::value(&o.mode), "application mode (client/server)")
                ("remote,r", po::value(&remote), "remote service multiaddress to connect to")
                ;

            po::variables_map vm;
            po::store(parse_command_line(argc, argv, desc), vm);
            po::notify(vm);

            if (vm.count("help") != 0 || argc == 1)
            {
                std::cerr << desc << "\n";
                return boost::none;
            }

            if (!remote.empty())
            {
                o.remote = remote;
            }

            if (o.mode != "client" && o.mode != "server")
            {
                std::cerr << desc << "\n";
                return boost::none;
            }

            return o;
        }
        catch (const std::exception& e)
        {
            std::cerr << e.what() << std::endl;
        }
        return boost::none;
    }
}

int main(int argc, char* argv[]) {
    auto options = parseCommandLine(argc, argv);
    if (!options)
    {
        return 1;
    }

    const std::string logger_config(R"(
    # ----------------
    sinks:
      - name: console
        type: console
        color: true
    groups:
      - name: graphsync_app
        sink: console
        level: info
        children:
          - name: libp2p
          - name: Gossip
    # ----------------
    )");

    // prepare log system
    auto logging_system = std::make_shared<soralog::LoggingSystem>(
        std::make_shared<soralog::ConfiguratorFromYAML>(
            // Original LibP2P logging config
            std::make_shared<libp2p::log::Configurator>(),
            // Additional logging config for application
            logger_config));
    logging_system->configure();

    libp2p::log::setLoggingSystem(logging_system);

    logger = sgns::common::createLogger("graphsync_app");
    logger->set_level(spdlog::level::trace);
    sgns::common::createLogger("graphsync")->set_level(spdlog::level::trace);

    if (options->mode == "client")
    {
        if (!options->remote)
        {
            std::cerr << "--remote parameter must be specified for the 'client' mode";
            return 1;
        }
        RunIpfsClient(options->remote.value());
    }
    else
        RunIpfsServer();
    return 0;
}

Updated on 2026-04-13 at 23:22:46 -0700