Skip to content

src/crdt/graphsync_dagsyncer.hpp

Namespaces

Name
sgns
sgns::crdt

Classes

Name
class sgns::crdt::GraphsyncDAGSyncer
A DAGSyncer is an abstraction to an IPLD-based p2p storage layer. A DAGSyncer is a DAGService with the ability to publish new ipld nodes to the network, and retrieving others from it.
struct sgns::crdt::GraphsyncDAGSyncer::BlacklistEntry

Functions

Name
OUTCOME_HPP_DECLARE_ERROR_2(sgns::crdt , GraphsyncDAGSyncer::Error )
Macro for declaring error handling in the IBasicProof class.

Functions Documentation

function OUTCOME_HPP_DECLARE_ERROR_2

OUTCOME_HPP_DECLARE_ERROR_2(
    sgns::crdt ,
    GraphsyncDAGSyncer::Error 
)

Macro for declaring error handling in the IBasicProof class.

Source code

#ifndef SUPERGENIUS_GRAPHSYNC_DAGSYNCER_HPP
#define SUPERGENIUS_GRAPHSYNC_DAGSYNCER_HPP

#include <memory>
#include <chrono>

#include "crdt/dagsyncer.hpp"
#include "base/logger.hpp"

#include <ipfs_lite/ipfs/graphsync/impl/merkledag_bridge_impl.hpp>
#include <ipfs_lite/ipfs/merkledag/impl/merkledag_service_impl.hpp>

#include <libp2p/host/host.hpp>

namespace sgns::crdt
{
    class GraphsyncDAGSyncer : public DAGSyncer, public std::enable_shared_from_this<GraphsyncDAGSyncer>
    {
    public:
        using IpfsDatastore       = ipfs_lite::ipfs::IpfsDatastore;
        using Graphsync           = ipfs_lite::ipfs::graphsync::Graphsync;
        using ResponseMetadata    = ipfs_lite::ipfs::graphsync::ResponseMetadata;
        using Extension           = ipfs_lite::ipfs::graphsync::Extension;
        using MerkleDagBridgeImpl = ipfs_lite::ipfs::graphsync::MerkleDagBridgeImpl;
        using ResponseStatusCode  = ipfs_lite::ipfs::graphsync::ResponseStatusCode;
        using Multiaddress        = libp2p::multi::Multiaddress;
        using Multihash           = libp2p::multi::Multihash;
        using PeerId              = libp2p::peer::PeerId;
        using Subscription        = libp2p::protocol::Subscription;
        using Logger              = base::Logger;
        using BlockCallback       = Graphsync::BlockCallback;

        // New peer registry types
        using PeerKey      = size_t; // Unique identifier for a peer in our registry
        using PeerEntry    = std::pair<PeerId, std::vector<Multiaddress>>;
        using RouteMapType = std::map<CID, PeerKey>; // Maps CIDs to peer registry keys

        enum class Error
        {
            CID_NOT_FOUND = 0,     
            ROUTE_NOT_FOUND,       
            PEER_BLACKLISTED,      
            TIMED_OUT,             
            DAGSYNCER_NOT_STARTED, 
            GRAPHSYNC_IS_NULL,     
            HOST_IS_NULL,          
        };

        struct BlacklistEntry
        {
            uint64_t timestamp;        // When the peer was last updated
            uint64_t failures;         // Number of consecutive failures
            bool     ever_connected;   // Flag indicating if we've ever successfully connected
            uint64_t backoff_attempts; // Count of backoff attempts (for exponential calculation)

            BlacklistEntry( uint64_t time, uint64_t count, bool connected = false ) :
                timestamp( time ), failures( count ), ever_connected( connected ), backoff_attempts( 0 )
            {
            }
        };

        GraphsyncDAGSyncer( std::shared_ptr<IpfsDatastore> service,
                            std::shared_ptr<Graphsync>     graphsync,
                            std::shared_ptr<libp2p::Host>  host );

        ~GraphsyncDAGSyncer() override
        {
            logger_->debug( "~GraphsyncDAGSyncer CALLED" );
        }

        outcome::result<void> Listen( const Multiaddress &listen_to );

        outcome::result<void> StartSync();

        void AddRoute( const CID &cid, const PeerId &peer, std::vector<Multiaddress> address );

        // DAGService interface implementation
        outcome::result<void> addNode( std::shared_ptr<const ipfs_lite::ipld::IPLDNode> node ) override;

        outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> getNode( const CID &cid ) const override;

        outcome::result<void> removeNode( const CID &cid ) override;

        outcome::result<size_t> select(
            gsl::span<const uint8_t>                                                     root_cid,
            gsl::span<const uint8_t>                                                     selector,
            std::function<bool( std::shared_ptr<const ipfs_lite::ipld::IPLDNode> node )> handler ) const override;

        outcome::result<std::shared_ptr<ipfs_lite::ipfs::merkledag::Leaf>> fetchGraph( const CID &cid ) const override;

        outcome::result<std::shared_ptr<ipfs_lite::ipfs::merkledag::Leaf>> fetchGraphOnDepth(
            const CID &cid,
            uint64_t   depth ) const override;

        outcome::result<bool> HasBlock( const CID &cid ) const override;

        outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> GetNodeWithoutRequest(
            const CID &cid ) const override;

        outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> GetNodeFromMerkleDAG( const CID &cid ) const;

        std::pair<LinkInfoSet, LinkInfoSet> TraverseCIDsLinks( ipfs_lite::ipld::IPLDNode &node,
                                                               std::string                link_name = "",
                                                               LinkInfoSet                visited = {} ) const override;

        outcome::result<void> markResolved( const CID &cid ) override;
        outcome::result<bool> isResolved( const CID &cid ) const override;
        /* Returns peer ID */
        outcome::result<PeerId> GetId() const;

        outcome::result<libp2p::peer::PeerInfo> GetPeerInfo() const;

        void AddToBlackList( const PeerId &peer ) const;

        bool IsOnBlackList( const PeerId &peer ) const;

        void                  InitCIDBlock( const CID &cid ) override;
        bool                  IsCIDInCache( const CID &cid ) const override;
        outcome::result<void> DeleteCIDBlock( const CID &cid ) override;

        void Stop() override;

    private:
        static constexpr uint64_t TIMEOUT_SECONDS = 1200;
        static constexpr uint64_t MAX_FAILURES    = 3;

        outcome::result<ipfs_lite::ipfs::graphsync::Subscription> RequestNode(
            const PeerId                              &peer,
            boost::optional<std::vector<Multiaddress>> address,
            const CID                                 &root_cid ) const;

        void RequestProgressCallback( ResponseStatusCode code, const std::vector<Extension> &extensions ) const;
        void BlockReceivedCallback( const CID &cid, common::Buffer buffer );

        void StopSync();

        // Helper methods for the peer registry
        PeerKey                    RegisterPeer( const PeerId &peer, std::vector<Multiaddress> address );
        outcome::result<PeerEntry> GetPeerById( PeerKey id ) const;

        bool AddCIDBlock( const CID &cid, const std::shared_ptr<ipfs_lite::ipld::IPLDNode> &block );
        outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> GrabCIDBlock( const CID &cid ) const;

        outcome::result<void> BlackListPeer( const PeerId &peer ) const;

        outcome::result<PeerEntry> GetRoute( const CID &cid ) const;
        void                       EraseRoutesFromPeerID( const PeerId &peer ) const;
        void                       EraseRoute( const CID &cid );

        static uint64_t GetCurrentTimestamp();

        static uint64_t getBackoffTimeout( uint64_t attempts, bool ever_connected );

        void RecordSuccessfulConnection( const PeerId &peer );

        void RecordCIDFailure( const PeerId &peer, const CID &cid ) const;
        bool HasRecentCIDFailure( const PeerId &peer, const CID &cid ) const;
        void ClearCIDFailure( const PeerId &peer, const CID &cid ) const;

        bool             started_ = false;
        std::vector<CID> unexpected_blocks;

        mutable std::mutex                               dagMutex_;
        ipfs_lite::ipfs::merkledag::MerkleDagServiceImpl dagService_;
        std::shared_ptr<Graphsync>                       graphsync_;

        std::shared_ptr<libp2p::Host> host_;

        Logger logger_ = base::createLogger( "GraphsyncDAGSyncer" );

        // keeping subscriptions alive, otherwise they cancel themselves
        // class Subscription have non-copyable constructor and operator, so it can not be used in std::vector
        // std::vector<Subscription> requests_;
        std::map<CID,
                 std::tuple<std::shared_ptr<Subscription>,
                            std::shared_ptr<std::promise<std::shared_ptr<ipfs_lite::ipld::IPLDNode>>>>>
            requests_;

        // New peer registry - stores unique peers and their addresses
        std::vector<PeerEntry>    peer_registry_;
        std::map<PeerId, PeerKey> peer_index_; // Maps PeerIds to registry indices
        mutable std::mutex        registry_mutex_;

        // Routing table that references peers in the registry
        mutable RouteMapType routing_;
        mutable std::mutex   routing_mutex_;

        mutable std::map<Multihash, BlacklistEntry> blacklist_;
        mutable std::mutex                          blacklist_mutex_;

        // Track CID-specific failures per peer to avoid re-requesting CIDs that peers don't have
        mutable std::map<std::pair<Multihash, CID>, uint64_t> cid_failures_; // peer+cid -> timestamp of failure
        mutable std::mutex                                    cid_failures_mutex_;

        std::map<CID, std::shared_ptr<ipfs_lite::ipld::IPLDNode>> received_blocks_;

        class LRUCIDCache
        {
        public:
            // Maximum number of blocks to store in the cache
            static constexpr size_t MAX_CACHE_SIZE = 250;

            void init( const CID &cid );
            bool add( const CID &cid, std::shared_ptr<ipfs_lite::ipld::IPLDNode> node );
            std::shared_ptr<ipfs_lite::ipld::IPLDNode> get( const CID &cid );
            bool                                       remove( const CID &cid );
            bool                                       contains( const CID &cid ) const;

            bool hasContent( const CID &cid ) const;

            size_t size() const
            {
                return cache_map_.size();
            }

            // Main storage: CID -> (Node, list iterator)
            std::map<CID, std::pair<std::shared_ptr<ipfs_lite::ipld::IPLDNode>, std::list<CID>::iterator>> cache_map_;

            // LRU list: most recently used at front, least recently used at back
            std::list<CID> lru_list_;

            mutable std::mutex mutex_;
        };

        mutable LRUCIDCache lru_cid_cache_;

        std::atomic<bool> is_stopped_{ false };
    };

}

OUTCOME_HPP_DECLARE_ERROR_2( sgns::crdt, GraphsyncDAGSyncer::Error );

#endif

Updated on 2026-03-04 at 13:10:44 -0800