Skip to content

ipfs_client2/testipfs.cpp

Classes

Name
struct Cmp
class Session

Functions

Name
void handleIncomingStream(libp2p::protocol::BaseProtocol::StreamResult stream_res)
void handleOutgoingStream(libp2p::protocol::BaseProtocol::StreamResult stream_res)
int main(int argc, char * argv[])

Attributes

Name
boost::optional< libp2p::peer::PeerId > self_id
std::set< std::shared_ptr< Session >, Cmp > sessions

Functions Documentation

function handleIncomingStream

void handleIncomingStream(
    libp2p::protocol::BaseProtocol::StreamResult stream_res
)

function handleOutgoingStream

void handleOutgoingStream(
    libp2p::protocol::BaseProtocol::StreamResult stream_res
)

function main

int main(
    int argc,
    char * argv[]
)

Attributes Documentation

variable self_id

boost::optional< libp2p::peer::PeerId > self_id;

variable sessions

std::set< std::shared_ptr< Session >, Cmp > sessions;

Source code

#include <iostream>
#include <memory>
#include <set>
#include <vector>

#include <boost/beast.hpp>
#include <bitswap.hpp>

#include <libp2p/multi/multibase_codec/multibase_codec_impl.hpp>
#include <libp2p/injector/host_injector.hpp>
#include <libp2p/injector/kademlia_injector.hpp>
#include <libp2p/host/host.hpp>
#include <libp2p/basic/scheduler/asio_scheduler_backend.hpp>
#include <libp2p/multi/content_identifier_codec.hpp>
#include <libp2p/protocol/identify/identify.hpp>
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/logger.hpp>

#include <libp2p/common/hexutil.hpp>
#include <libp2p/injector/kademlia_injector.hpp>
#include <libp2p/log/configurator.hpp>
#include <libp2p/log/sublogger.hpp>
#include <libp2p/multi/content_identifier_codec.hpp>
#include "base/logger.hpp"

#include <ipfs_lite/ipld/impl/ipld_node_impl.hpp>
#include <libp2p/protocol/kademlia/kademlia.hpp>
#include <libp2p/multi/content_identifier.hpp>
#include <boost/program_options.hpp>
#include <boost/format.hpp>
#include <boost/di/extension/scopes/shared.hpp>
#include <libp2p/basic/scheduler.hpp>
class Session;

struct Cmp {
  bool operator()(const std::shared_ptr<Session> &lhs,
                  const std::shared_ptr<Session> &rhs) const;
};

// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
boost::optional<libp2p::peer::PeerId> self_id;

// NOLINTNEXTLINE(cppcoreguidelines-avoid-non-const-global-variables)
std::set<std::shared_ptr<Session>, Cmp> sessions;

class Session : public std::enable_shared_from_this<Session> {
 public:
  explicit Session(std::shared_ptr<libp2p::connection::Stream> stream)
      : stream_(std::move(stream)),
        incoming_(std::make_shared<std::vector<uint8_t>>(1 << 12)){};

  bool read() {
    if (stream_->isClosedForRead()) {
      close();
      return false;
    }

    stream_->readSome(
        gsl::span(incoming_->data(), static_cast<ssize_t>(incoming_->size())),
        incoming_->size(),
        [self = shared_from_this()](libp2p::outcome::result<size_t> result) {
          if (!result) {
            self->close();
            std::cout << self->stream_->remotePeerId().value().toBase58()
                      << " - closed at reading" << std::endl;
            return;
          }
          std::cout << self->stream_->remotePeerId().value().toBase58() << " > "
                    << std::string(self->incoming_->begin(),
                                   self->incoming_->begin()
                                       + static_cast<ssize_t>(result.value()));
          std::cout.flush();
          self->read();
        });
    return true;
  }

  bool write(const std::shared_ptr<std::vector<uint8_t>> &buffer) {
    if (stream_->isClosedForWrite()) {
      close();
      return false;
    }

    stream_->write(
        gsl::span(buffer->data(), static_cast<ssize_t>(buffer->size())),
        buffer->size(),
        [self = shared_from_this(),
         buffer](libp2p::outcome::result<size_t> result) {
          if (!result) {
            self->close();
            std::cout << self->stream_->remotePeerId().value().toBase58()
                      << " - closed at writting" << std::endl;
            return;
          }
          std::cout << self->stream_->remotePeerId().value().toBase58() << " < "
                    << std::string(buffer->begin(),
                                   buffer->begin()
                                       + static_cast<ssize_t>(result.value()));
          std::cout.flush();
        });
    return true;
  }

  void close() {
    stream_->close([self = shared_from_this()](auto) {});
    sessions.erase(shared_from_this());
  }

  bool operator<(const Session &other) {
    return stream_->remotePeerId().value()
        < other.stream_->remotePeerId().value();
  }

 private:
  std::shared_ptr<libp2p::connection::Stream> stream_;
  std::shared_ptr<std::vector<uint8_t>> incoming_;
};

bool Cmp::operator()(const std::shared_ptr<Session> &lhs,
                     const std::shared_ptr<Session> &rhs) const {
  return *lhs < *rhs;
}

void handleIncomingStream(
    libp2p::protocol::BaseProtocol::StreamResult stream_res) {
  if (!stream_res) {
    std::cerr << " ! incoming connection failed: "
              << stream_res.error().message() << std::endl;
    return;
  }
  auto &stream = stream_res.value();

  // reject incoming stream with themselves
  if (stream->remotePeerId().value() == self_id) {
    stream->reset();
    return;
  }

  std::cout << stream->remotePeerId().value().toBase58()
            << " + incoming stream from "
            << stream->remoteMultiaddr().value().getStringAddress()
            << std::endl;

  auto session = std::make_shared<Session>(stream);
  if (auto [it, ok] = sessions.emplace(std::move(session)); ok) {
    (*it)->read();
  }
}

void handleOutgoingStream(
    libp2p::protocol::BaseProtocol::StreamResult stream_res) {
  if (!stream_res) {
    std::cerr << " ! outgoing connection failed: "
              << stream_res.error().message() << std::endl;
    return;
  }
  auto &stream = stream_res.value();

  // reject outgoing stream to themselves
  if (stream->remotePeerId().value() == self_id) {
    stream->reset();
    return;
  }

  std::cout << stream->remotePeerId().value().toBase58()
            << " + outgoing stream to "
            << stream->remoteMultiaddr().value().getStringAddress()
            << std::endl;

  auto session = std::make_shared<Session>(stream);
  if (auto [it, ok] = sessions.emplace(std::move(session)); ok) {
    (*it)->read();
  }
}

namespace {
  const std::string logger_config(R"(
# ----------------
sinks:
  - name: console
    type: console
    color: true
groups:
  - name: main
    sink: console
    level: debug
    children:
      - name: libp2p
      - name: kademlia
# ----------------
  )");
}  // namespace

int main(int argc, char *argv[]) {
    const std::string logger_config(R"(
    # ----------------
    sinks:
      - name: console
        type: console
        color: false
    groups:
      - name: main
        sink: console
        level: debug
        children:
          - name: libp2p
          - name: kademlia
    # ----------------
    )");
  // prepare log system
  auto logging_system = std::make_shared<soralog::LoggingSystem>(
      std::make_shared<soralog::ConfiguratorFromYAML>(
          // Original LibP2P logging config
          std::make_shared<libp2p::log::Configurator>(),
          // Additional logging config for application
          logger_config));
  auto r = logging_system->configure();
  if (!r.message.empty()) {
    (r.has_error ? std::cerr : std::cout) << r.message << std::endl;
  }
  if (r.has_error) {
    exit(EXIT_FAILURE);
  }

  libp2p::log::setLoggingSystem(logging_system);
  //if (std::getenv("TRACE_DEBUG") != nullptr) {
  //  libp2p::log::setLevelOfGroup("main", soralog::Level::TRACE);
  //} else {
  //  libp2p::log::setLevelOfGroup("main", soralog::Level::ERROR_);
  //}

  // resulting PeerId should be
  // 12D3KooWEgUjBV5FJAuBSoNMRYFRHjV7PjZwRQ7b43EKX9g7D6xV
  libp2p::crypto::KeyPair kp = {
      // clang-format off
      /*.publicKey =*/ {{
        /*.type =*/ libp2p::crypto::Key::Type::Ed25519,
        /*.data =*/ libp2p::common::unhex("48453469c62f4885373099421a7365520b5ffb0d93726c124166be4b81d852e6").value()
      }},
      /*.privateKey =*/ {{
        /*.type =*/ libp2p::crypto::Key::Type::Ed25519,
        /*.data =*/ libp2p::common::unhex("4a9361c525840f7086b893d584ebbe475b4ec7069951d2e897e8bceb0a3f35ce").value()
      }},
      // clang-format on
  };

  libp2p::protocol::kademlia::Config kademlia_config;
  kademlia_config.randomWalk.enabled = true;
  kademlia_config.randomWalk.interval = std::chrono::seconds(300);
  kademlia_config.requestConcurency = 20;

  auto injector = libp2p::injector::makeHostInjector(
      // libp2p::injector::useKeyPair(kp), // Use predefined keypair
      libp2p::injector::makeKademliaInjector(
          libp2p::injector::useKademliaConfig(kademlia_config)));

  try {
    //if (argc < 2) {
    //  std::cerr << "Needs one argument - address" << std::endl;
    //  exit(EXIT_FAILURE);
    //}

    auto bootstrap_nodes = [] {
      std::vector<std::string> addresses = {
          // clang-format off
          "/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
          "/dnsaddr/bootstrap.libp2p.io/ipfs/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
          "/dnsaddr/bootstrap.libp2p.io/ipfs/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
          "/dnsaddr/bootstrap.libp2p.io/ipfs/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
          "/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ",            // mars.i.ipfs.io
          "/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",           // pluto.i.ipfs.io
          "/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",           // saturn.i.ipfs.io
          "/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64",             // venus.i.ipfs.io
          "/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd",            // earth.i.ipfs.io
          "/ip6/2604:a880:1:20::203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM",  // pluto.i.ipfs.io
          "/ip6/2400:6180:0:d0::151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu",  // saturn.i.ipfs.io
          "/ip6/2604:a880:800:10::4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64", // venus.i.ipfs.io
          "/ip6/2a03:b0c0:0:1010::23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd", // earth.i.ipfs.io
          //"/ip4/127.0.0.1/tcp/4001/p2p/12D3KooWFMdNiBFk5ojGNzWjqSTL1HGLu8rXns5kwqUPTrbFNtEN",
          // clang-format on
      };

      std::unordered_map<libp2p::peer::PeerId,
                         std::vector<libp2p::multi::Multiaddress>>
          addresses_by_peer_id;
      std::cout << "what" << std::endl;
      for (auto &address : addresses) {
        auto ma = libp2p::multi::Multiaddress::create(address).value();
        auto peer_id_base58 = ma.getPeerId().value();
        auto peer_id = libp2p::peer::PeerId::fromBase58(peer_id_base58).value();

        addresses_by_peer_id[std::move(peer_id)].emplace_back(std::move(ma));
      }
      std::cout << "what2" << std::endl;
      std::vector<libp2p::peer::PeerInfo> v;
      v.reserve(addresses_by_peer_id.size());
      for (auto &i : addresses_by_peer_id) {
        v.emplace_back(libp2p::peer::PeerInfo{
        /*.id =*/ i.first, /*.addresses =*/ {std::move(i.second)}});
      }

      return v;
    }();

    //auto ma = libp2p::multi::Multiaddress::create(argv[1]).value();  // NOLINT
    auto ma = libp2p::multi::Multiaddress::create("/ip4/127.0.0.1/tcp/40000").value();
    auto io = injector.create<std::shared_ptr<boost::asio::io_context>>();

    auto host = injector.create<std::shared_ptr<libp2p::Host>>();

    self_id = host->getId();

    std::cerr << self_id->toBase58() << " * started" << std::endl;

    auto kademlia =
        injector
            .create<std::shared_ptr<libp2p::protocol::kademlia::Kademlia>>();

    auto identityManager = injector.create<std::shared_ptr<libp2p::peer::IdentityManager>>();
    auto keyMarshaller = injector.create<std::shared_ptr<libp2p::crypto::marshaller::KeyMarshaller>>();

    auto identifyMessageProcessor = std::make_shared<libp2p::protocol::IdentifyMessageProcessor>(
        *host, host->getNetwork().getConnectionManager(), *identityManager, keyMarshaller);
    //auto identify = std::make_shared<libp2p::protocol::Identify>(*host, identifyMessageProcessor, host->getBus());
    // Handle streams for observed protocol
    //host->setProtocolHandler("/chat/1.0.0", handleIncomingStream);
    //host->setProtocolHandler("/chat/1.1.0", handleIncomingStream);

    // Key for group of chat
    //libp2p::protocol::kademlia::ContentId content_id("QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D");
    auto cid = libp2p::multi::ContentIdentifierCodec::fromString("QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D").value();
    auto content_id = libp2p::protocol::kademlia::ContentId::fromWire(
        libp2p::multi::ContentIdentifierCodec::encode(cid).value());
    auto &scheduler = injector.create<libp2p::basic::Scheduler &>();

    std::function<void()> find_providers = [&] {
      [[maybe_unused]] auto res1 = kademlia->findProviders(
          content_id.value(), 100,
          [&](libp2p::outcome::result<std::vector<libp2p::peer::PeerInfo>>
                  res) {
            scheduler
                .schedule(libp2p::basic::Scheduler::toTicks(
                              kademlia_config.randomWalk.interval),
                          find_providers)
                .detach();

            if (!res) {
              std::cerr << "Cannot find providers: " << res.error().message()
                        << std::endl;
              return;
            }
            std::cout << "In Providers" << std::endl;
            auto &providers = res.value();
            for (auto &provider : providers) {
            //  host->newStream(provider, "/chat/1.1.0", handleOutgoingStream);
                std::cout << "provider" << std::endl;
            }
          });
    };

    std::function<void()> provide = [&, content_id] {
      [[maybe_unused]] auto res =
          kademlia->provide(content_id.value(), !kademlia_config.passiveMode);

      scheduler
          .schedule(libp2p::basic::Scheduler::toTicks(
                        kademlia_config.randomWalk.interval),
                    provide)
          .detach();
    };

    io->post([&] {
      auto listen = host->listen(ma);
      if (!listen) {
        std::cerr << "Cannot listen address " << ma.getStringAddress().data()
                  << ". Error: " << listen.error().message() << std::endl;
        std::exit(EXIT_FAILURE);
      }

      for (auto &bootstrap_node : bootstrap_nodes) {
        kademlia->addPeer(bootstrap_node, true);
      }
      //identify->start();
      host->start();

      //auto cid = libp2p::multi::ContentIdentifierCodec::decode(content_id.data)
      //               .value();
      //auto peer_id =
       //   libp2p::peer::PeerId::fromHash(cid.content_address).value();

      //[[maybe_unused]] auto res = kademlia->findPeer(peer_id, [&](auto) {
        // Say to world about his providing
        //provide();

        // Ask provider from world
        find_providers();

        kademlia->start();
      //});
    });

    //boost::asio::posix::stream_descriptor in(*io, ::dup(STDIN_FILENO));
    std::array<uint8_t, 1 << 12> buffer{};

    // Asynchronous transmit data from standard input to peers, that's privided
    // same content id
    //std::function<void()> read_from_console = [&] {
    //  in.async_read_some(boost::asio::buffer(buffer), [&](auto ec, auto size) {
    //    auto i = std::find_if(buffer.begin(), buffer.begin() + size + 1,
    //                          [](auto c) { return c == '\n'; });

    //    if (i != buffer.begin() + size + 1) {
    //      auto out = std::make_shared<std::vector<uint8_t>>();
    //      out->assign(buffer.begin(), buffer.begin() + size);

    //      for (const auto &session : sessions) {
    //        session->write(out);
    //      }
    //    }
    //    read_from_console();
    //  });
    //};
    //read_from_console();

    boost::asio::signal_set signals(*io, SIGINT, SIGTERM);
    signals.async_wait(
        [&io](const boost::system::error_code &, int) { io->stop(); });
    io->run();
  } catch (const std::exception &e) {
    std::cerr << "Exception: " << e.what() << std::endl;
    exit(EXIT_FAILURE);
  }

  exit(EXIT_SUCCESS);
}

Updated on 2026-04-13 at 23:22:46 -0700