Skip to content

src/crdt/impl/graphsync_dagsyncer.cpp

Namespaces

Name
sgns
sgns::crdt

Functions

Name
OUTCOME_CPP_DEFINE_CATEGORY_3(sgns::crdt , GraphsyncDAGSyncer::Error , e )

Functions Documentation

function OUTCOME_CPP_DEFINE_CATEGORY_3

OUTCOME_CPP_DEFINE_CATEGORY_3(
    sgns::crdt ,
    GraphsyncDAGSyncer::Error ,
    e 
)

Source code

#include "crdt/graphsync_dagsyncer.hpp"
#include "outcome/outcome.hpp"

#include <ipfs_lite/ipld/impl/ipld_node_impl.hpp>
#include <memory>
#include <utility>
#include <thread>
#include <deque>

OUTCOME_CPP_DEFINE_CATEGORY_3( sgns::crdt, GraphsyncDAGSyncer::Error, e )
{
    switch ( e )
    {
        case sgns::crdt::GraphsyncDAGSyncer::Error::CID_NOT_FOUND:
            return "The Requested CID was not found";
        case sgns::crdt::GraphsyncDAGSyncer::Error::ROUTE_NOT_FOUND:
            return "No route to find the CID";
        case sgns::crdt::GraphsyncDAGSyncer::Error::PEER_BLACKLISTED:
            return "The peer who has the CID is blacklisted";
        case sgns::crdt::GraphsyncDAGSyncer::Error::TIMED_OUT:
            return "The request has timed out";
        case sgns::crdt::GraphsyncDAGSyncer::Error::DAGSYNCER_NOT_STARTED:
            return "The Start method was never called, or StopSync was called";
        case sgns::crdt::GraphsyncDAGSyncer::Error::GRAPHSYNC_IS_NULL:
            return "The graphsync member is null";
        case sgns::crdt::GraphsyncDAGSyncer::Error::HOST_IS_NULL:
            return "The host member is null";
    }
    return "Unknown error";
}

namespace sgns::crdt
{
    GraphsyncDAGSyncer::GraphsyncDAGSyncer( std::shared_ptr<IpfsDatastore> service,
                                            std::shared_ptr<Graphsync>     graphsync,
                                            std::shared_ptr<libp2p::Host>  host ) :
        dagService_( std::move( service ) ), graphsync_( std::move( graphsync ) ), host_( std::move( host ) )
    {
        logger_->debug( "GraphSyncer created {} ", reinterpret_cast<size_t>( this ) );
    }

    GraphsyncDAGSyncer::PeerKey GraphsyncDAGSyncer::RegisterPeer( const PeerId             &peer,
                                                                  std::vector<Multiaddress> address )
    {
        std::lock_guard lock( registry_mutex_ );

        // Check if peer already exists in the registry
        if ( auto it = peer_index_.find( peer ); it != peer_index_.end() )
        {
            // Peer already registered, update addresses
            PeerKey key                = it->second;
            peer_registry_[key].second = address;
            return key;
        }

        // Register new peer
        PeerKey key = peer_registry_.size();
        peer_registry_.emplace_back( peer, address );
        peer_index_.emplace( peer, key );

        logger_->debug( "Registered new peer {} with key {}", peer.toBase58(), key );
        return key;
    }

    outcome::result<GraphsyncDAGSyncer::PeerEntry> GraphsyncDAGSyncer::GetPeerById( PeerKey id ) const
    {
        std::lock_guard lock( registry_mutex_ );

        if ( id >= peer_registry_.size() )
        {
            logger_->error( "No route for the requested PeerID  {}", id );
            return outcome::failure( Error::ROUTE_NOT_FOUND );
        }

        return peer_registry_[id];
    }

    outcome::result<void> GraphsyncDAGSyncer::Listen( const Multiaddress &listen_to )
    {
        logger_->debug( "Starting to listen {} ", reinterpret_cast<size_t>( this ) );

        if ( this->host_ == nullptr )
        {
            return outcome::failure( Error::HOST_IS_NULL );
        }

        OUTCOME_TRY( host_->listen( listen_to ) );

        auto startResult = this->StartSync();

        return startResult;
    }

    outcome::result<ipfs_lite::ipfs::graphsync::Subscription> GraphsyncDAGSyncer::RequestNode(
        const PeerId                              &peer,
        boost::optional<std::vector<Multiaddress>> address,
        const CID                                 &root_cid ) const
    {
        if ( !started_ )
        {
            logger_->error( "DagSyncer not started" );
            return outcome::failure( Error::DAGSYNCER_NOT_STARTED );
        }

        if ( graphsync_ == nullptr )
        {
            logger_->error( "graphsyncer is null" );
            return outcome::failure( Error::GRAPHSYNC_IS_NULL );
        }

        Extension              response_metadata_extension = ipfs_lite::ipfs::graphsync::encodeResponseMetadata( {} );
        std::vector<Extension> extensions{ response_metadata_extension };

        Extension do_not_send_cids_extension = ipfs_lite::ipfs::graphsync::encodeDontSendCids( {} );
        extensions.push_back( do_not_send_cids_extension );

        auto subscription = graphsync_->makeRequest(
            peer,
            std::move( address ),
            root_cid,
            {},
            extensions,
            [weakptr = weak_from_this()]( const ResponseStatusCode code, const std::vector<Extension> &extensions )
            {
                if ( auto self = weakptr.lock() )
                {
                    self->RequestProgressCallback( code, extensions );
                }
            } );

        logger_->debug( "Requesting Node {} on this {}",
                        root_cid.toString().value(),
                        reinterpret_cast<size_t>( this ) );

        return subscription;
    }

    void GraphsyncDAGSyncer::AddRoute( const CID &cid, const PeerId &peer, std::vector<Multiaddress> address )
    {
        // Register the peer (or get existing key if already registered)
        PeerKey peerKey = RegisterPeer( peer, std::move( address ) );

        // Add the CID route to the routing table
        std::lock_guard lock( routing_mutex_ );
        routing_[cid] = peerKey;

        logger_->debug( "Added route for CID {} to peer {} (key {})",
                        cid.toString().value(),
                        peer.toBase58(),
                        peerKey );
    }

    outcome::result<void> GraphsyncDAGSyncer::addNode( std::shared_ptr<const ipfs_lite::ipld::IPLDNode> node )
    {
        std::lock_guard lock( dagMutex_ );
        auto            cid = node->getCID();
        auto            ret = dagService_.addNode( std::move( node ) );

        if ( !ret.has_error() )
        {
            logger_->debug( "{}: Added node {} on dagService ", __func__, cid.toString().value() );
            EraseRoute( cid );
        }
        else
        {
            logger_->error( "{}: ERROR Adding node {} on dagService ", __func__, cid.toString().value() );
        }
        return ret;
    }

    outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> GraphsyncDAGSyncer::getNode( const CID &cid ) const
    {
        auto node = GrabCIDBlock( cid );

        if ( !node.has_error() )
        {
            logger_->debug( "Return node for CID {} instance={}",
                            cid.toString().value(),
                            reinterpret_cast<size_t>( this ) );
            return node;
        }

        node = GetNodeFromMerkleDAG( cid );

        if ( !node.has_error() )
        {
            logger_->debug( "Return node for CID {} instance={}",
                            cid.toString().value(),
                            reinterpret_cast<size_t>( this ) );
            return node;
        }

        bool already_requested = false;

        if ( auto initial_state = graphsync_->getRequestState( cid ) )
        {
            if ( initial_state.value() == Graphsync::RequestState::IN_PROGRESS )
            {
                already_requested = true;
                logger_->debug( "We already started trying to get this CID {}", cid.toString().value() );
            }
        }
        ipfs_lite::ipfs::graphsync::Subscription curr_subscription;

        OUTCOME_TRY( auto peerEntry, GetRoute( cid ) );

        auto &peerID  = peerEntry.first;
        auto &address = peerEntry.second;
        // Check if this peer recently failed to provide this specific CID
        if ( HasRecentCIDFailure( peerID, cid ) )
        {
            logger_->error( "Skipping request for CID {} from peer {} due to recent failure",
                            cid.toString().value(),
                            peerID.toBase58() );
            return outcome::failure( Error::CID_NOT_FOUND );
        }
        if ( already_requested == false )
        {
            logger_->debug( "Requesting CID {}", cid.toString().value() );
            OUTCOME_TRY( ( auto &&, subscription ), RequestNode( peerID, address, cid ) );
            curr_subscription = std::move( subscription );
        }

        while ( true )
        {
            if ( is_stopped_ )
            {
                logger_->error( "We exited while trying to sync {} as it must have been still in progress.",
                                cid.toString().value() );
                return outcome::failure( Error::DAGSYNCER_NOT_STARTED );
            }
            // Check request state
            auto state_result = graphsync_->getRequestState( cid );
            if ( !state_result )
            {
                // Request not found - This could indicate a failure, but it's also possible it just got cleaned up, so check cache or storage to see if we have the block
                if ( auto result = GrabCIDBlock( cid ) )
                {
                    logger_->debug( "Return node for CID {} instance={}",
                                    cid.toString().value(),
                                    reinterpret_cast<size_t>( this ) );
                    return result;
                }
                if ( auto result = GetNodeWithoutRequest( cid ) )
                {
                    logger_->debug( "Return node for CID {} instance={}",
                                    cid.toString().value(),
                                    reinterpret_cast<size_t>( this ) );
                    return result;
                }
                logger_->error( "Request state not found for CID {}", cid.toString().value() );
                OUTCOME_TRY( BlackListPeer( peerID ) );
                return outcome::failure( Error::ROUTE_NOT_FOUND );
            }

            switch ( state_result.value() )
            {
                case Graphsync::RequestState::COMPLETED:
                {
                    // Request completed but we don't have the block?
                    // Try one more cache grab
                    if ( auto result = GrabCIDBlock( cid ) )
                    {
                        logger_->debug( "Return node for CID {} instance={}",
                                        cid.toString().value(),
                                        reinterpret_cast<size_t>( this ) );
                        return result;
                    }
                    if ( auto result = GetNodeWithoutRequest( cid ) )
                    {
                        logger_->debug( "Return node for CID {} instance={}",
                                        cid.toString().value(),
                                        reinterpret_cast<size_t>( this ) );
                        return result;
                    }
                    // If still not found, this is strange but we'll fail
                    logger_->error( "Request marked COMPLETED but block not in cache or storage: {}",
                                    cid.toString().value() );
                    return outcome::failure( Error::CID_NOT_FOUND );
                }
                case Graphsync::RequestState::FAILED:
                {
                    // Request explicitly failed - record that this peer doesn't have this specific CID
                    // but don't blacklist the entire peer since they might have other content
                    logger_->debug( "Request failed for CID {} from peer {} - recording CID-specific failure",
                                    cid.toString().value(),
                                    peerID.toBase58() );
                    RecordCIDFailure( peerID, cid );
                    return outcome::failure( Error::CID_NOT_FOUND );
                }
                case Graphsync::RequestState::IN_PROGRESS:
                {
                    // Still in progress, keep waiting
                    logger_->trace( "Request for CID {} from peer {} - In Progress",
                                    cid.toString().value(),
                                    peerID.toBase58() );
                    std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
                    break;
                }
            }
        }
    }

    outcome::result<void> GraphsyncDAGSyncer::removeNode( const CID &cid )
    {
        std::lock_guard lock( dagMutex_ );
        return dagService_.removeNode( cid );
    }

    outcome::result<size_t> GraphsyncDAGSyncer::select(
        gsl::span<const uint8_t>                                                     root_cid,
        gsl::span<const uint8_t>                                                     selector,
        std::function<bool( std::shared_ptr<const ipfs_lite::ipld::IPLDNode> node )> handler ) const
    {
        std::lock_guard lock( dagMutex_ );
        return dagService_.select( root_cid, selector, handler );
    }

    outcome::result<std::shared_ptr<ipfs_lite::ipfs::merkledag::Leaf>> GraphsyncDAGSyncer::fetchGraph(
        const CID &cid ) const
    {
        return ipfs_lite::ipfs::merkledag::MerkleDagServiceImpl::fetchGraphOnDepth(
            [weakptr = weak_from_this()](
                const CID &cid ) -> outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>>
            {
                if ( auto self = weakptr.lock() )
                {
                    return self->getNode( cid );
                }
                return outcome::failure( boost::system::error_code{} );
            },
            cid,
            {} );
    }

    outcome::result<std::shared_ptr<ipfs_lite::ipfs::merkledag::Leaf>> GraphsyncDAGSyncer::fetchGraphOnDepth(
        const CID &cid,
        uint64_t   depth ) const
    {
        return ipfs_lite::ipfs::merkledag::MerkleDagServiceImpl::fetchGraphOnDepth(
            [weakptr = weak_from_this()](
                const CID &cid ) -> outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>>
            {
                if ( auto self = weakptr.lock() )
                {
                    return self->getNode( cid );
                }
                return outcome::failure( boost::system::error_code{} );
            },
            cid,
            depth );
    }

    outcome::result<bool> GraphsyncDAGSyncer::HasBlock( const CID &cid ) const
    {
        auto getNodeResult       = GetNodeFromMerkleDAG( cid );
        auto getCachedNodeResult = GrabCIDBlock( cid );

        return getNodeResult.has_value() || getCachedNodeResult.has_value();
    }

    outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> GraphsyncDAGSyncer::GetNodeWithoutRequest(
        const CID &cid ) const
    {
        auto getNodeResult = GetNodeFromMerkleDAG( cid );

        if ( getNodeResult.has_value() )
        {
            return getNodeResult;
        }

        return GrabCIDBlock( cid );
    }

    outcome::result<void> GraphsyncDAGSyncer::StartSync()
    {
        if ( started_ )
        {
            return outcome::success();
        }

        if ( graphsync_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto dagService = std::make_shared<MerkleDagBridgeImpl>( shared_from_this() );

        BlockCallback blockCallback = [weakptr = weak_from_this()]( const CID &cid, common::Buffer buffer )
        {
            if ( auto self = weakptr.lock() )
            {
                self->BlockReceivedCallback( cid, buffer );
            }
        };

        graphsync_->start( dagService, blockCallback );

        if ( host_ == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        host_->start();

        started_ = true;

        return outcome::success();
    }

    void GraphsyncDAGSyncer::StopSync()
    {
        if ( graphsync_ != nullptr )
        {
            graphsync_->stop();
        }
        if ( host_ != nullptr )
        {
            host_->stop();
        }
        started_ = false;
    }

    outcome::result<GraphsyncDAGSyncer::PeerId> GraphsyncDAGSyncer::GetId() const
    {
        if ( host_ != nullptr )
        {
            return host_->getId();
        }
        return outcome::failure( boost::system::error_code{} );
    }

    outcome::result<libp2p::peer::PeerInfo> GraphsyncDAGSyncer::GetPeerInfo() const
    {
        if ( host_ != nullptr )
        {
            return host_->getPeerInfo();
        }
        return outcome::failure( boost::system::error_code{} );
    }

    void GraphsyncDAGSyncer::RequestProgressCallback( ResponseStatusCode            code,
                                                      const std::vector<Extension> &extensions ) const
    {
        std::string s;
        for ( const auto &[name, data] : extensions )
        {
            s += fmt::format( "({}: 0x{}) ", name, common::Buffer( data ).toHex() );
        }

        logger_->debug( "request progress: code={}, extensions={}", statusCodeToString( code ), s );
    }

    void GraphsyncDAGSyncer::BlockReceivedCallback( const CID &cid, common::Buffer buffer )
    {
        logger_->trace( "Block received: cid={}", cid.toString().value() );
        auto hb = HasBlock( cid );

        if ( hb.has_failure() )
        {
            logger_->debug( "HasBlock failed: {}, cid: {}", hb.error().message().c_str(), cid.toString().value() );
            return;
        }
        logger_->debug( "HasBlock: {}, cid: {}", hb.value(), cid.toString().value() );

        if ( hb.value() )
        {
            logger_->debug( "We already have this node {}", cid.toString().value() );
            return;
        }

        auto node = ipfs_lite::ipld::IPLDNodeImpl::createFromRawBytes( buffer );
        if ( node.has_failure() )
        {
            logger_->error( "Cannot create node from received block data for CID {}", cid.toString().value() );
            return;
        }

        if ( AddCIDBlock( cid, node.value() ) )
        {
            logger_->error( " Block received without CRDT asking for it explicitly " );
        }

        std::stringstream sslinks;
        for ( const auto &link : node.value()->getLinks() )
        {
            sslinks << "[";
            sslinks << link.get().getCID().toString().value();
            sslinks << link.get().getName();
            sslinks << "], ";
        }
        logger_->debug( "Node added to dagService. CID: {}, links: [{}]",
                        node.value()->getCID().toString().value(),
                        sslinks.str() );

        if ( auto maybe_route_info = GetRoute( cid ) )
        {
            auto &[peerID, address] = maybe_route_info.value();
            logger_->debug( "Seeing if peer {} has links to AddRoute", peerID.toBase58() );
            if ( IsOnBlackList( peerID ) )
            {
                //I don't think it should ever land here
                logger_->debug( "Peer {} was blacklisted", peerID.toBase58() );
            }
            else
            {
                RecordSuccessfulConnection( peerID );

                // Clear any CID failure record for this peer and CID since they successfully provided it
                ClearCIDFailure( peerID, cid );

                auto [links_to_fetch, _1] = TraverseCIDsLinks( *node.value() );
                for ( const auto &[cid, _2] : links_to_fetch )
                {
                    logger_->trace( "Adding route for peer {} and CID {}", peerID.toBase58(), cid.toString().value() );
                    AddRoute( cid, peerID, address );
                }
            }
        }
    }

    std::pair<DAGSyncer::LinkInfoSet, DAGSyncer::LinkInfoSet> GraphsyncDAGSyncer::TraverseCIDsLinks(
        ipfs_lite::ipld::IPLDNode &node,
        std::string                link_name,
        LinkInfoSet                visited ) const
    {
        LinkInfoSet links_to_fetch;

        // Use iterative approach with a work queue to avoid stack overflow
        // Each work item contains the node to process
        struct WorkItem
        {
            std::shared_ptr<ipfs_lite::ipld::IPLDNode> node;
        };

        std::deque<WorkItem> work_queue;

        // Start with the root node
        const CID &root_cid = node.getCID();

        auto tree_resolved_res = isResolved( root_cid );
        if ( tree_resolved_res.has_failure() )
        {
            logger_->error( "{}: isResolved failed: {}, cid: {}",
                            __func__,
                            tree_resolved_res.error().message().c_str(),
                            root_cid.toString().value() );
            return { std::move( links_to_fetch ), std::move( visited ) };
        }

        if ( tree_resolved_res.value() )
        {
            logger_->debug( "TraverseCIDsLinks: Skipping traversal of root {} (already resolved)",
                            root_cid.toString().value() );
            return { std::move( links_to_fetch ), std::move( visited ) };
        }

        logger_->info( "TraverseCIDsLinks: Checking links on {{ cid=\"{}\", name=\"{}\" }}",
                       root_cid.toString().value(),
                       link_name );

        // Add the root node to the work queue
        // Create a shared_ptr from the reference - note this assumes the node is already managed
        work_queue.push_back( WorkItem{ std::shared_ptr<ipfs_lite::ipld::IPLDNode>( &node, []( ipfs_lite::ipld::IPLDNode * ) {} ) } );

        // Process the queue iteratively
        while ( !work_queue.empty() )
        {
            WorkItem current_item = std::move( work_queue.front() );
            work_queue.pop_front();

            auto &current_node = *current_item.node;
            const CID &current_cid = current_node.getCID();

            logger_->trace( "TraverseCIDsLinks: Processing node {}", current_cid.toString().value() );

            // Process all links in the current node
            for ( const auto &link : current_node.getLinks() )
            {
                const CID         &child = link.get().getCID();
                const std::string &name  = link.get().getName();
                LinkInfoPair       pair{ child, name };

                logger_->trace( "TraverseCIDsLinks: Link: name '{}' != '{}'", name, link_name );
                if ( !link_name.empty() && name != link_name )
                {
                    logger_->debug( "TraverseCIDsLinks: Skipping link: name '{}' != '{}'", name, link_name );
                    continue;
                }

                if ( !visited.insert( pair ).second )
                {
                    logger_->info( "TraverseCIDsLinks: Already visited {{ link=\"{}\", name=\"{}\" }}",
                                   child.toString().value(),
                                   name );
                    continue;
                }

                auto child_resolved_res = isResolved( child );
                if ( child_resolved_res.has_failure() )
                {
                    logger_->error( "{}: isResolved failed: {}, cid: {}",
                                    __func__,
                                    child_resolved_res.error().message().c_str(),
                                    child.toString().value() );
                }
                else if ( child_resolved_res.value() )
                {
                    logger_->debug( "TraverseCIDsLinks: Skipping traversal of resolved child {}",
                                    child.toString().value() );
                    continue;
                }

                logger_->info( "TraverseCIDsLinks: Found link {{ cid=\"{}\", name=\"{}\", size={} }}",
                               child.toString().value(),
                               name,
                               link.get().getSize() );

                auto get_child_result = GetNodeWithoutRequest( child );
                if ( get_child_result.has_failure() )
                {
                    logger_->debug( "TraverseCIDsLinks: Missing block {}, adding as link to be fetched",
                                    child.toString().value() );
                    links_to_fetch.insert( pair );
                    continue;
                }

                // Add the child node to the front of the work queue for depth-first processing
                // This ensures children are resolved before their parents, maintaining the original
                // recursive behavior where we don't mark a CID as complete until all its children are processed
                work_queue.push_front( WorkItem{ get_child_result.value() } );

                logger_->trace( "TraverseCIDsLinks: Added child {} to work queue (queue size: {})",
                                child.toString().value(),
                                work_queue.size() );
            }
        }

        logger_->info( "TraverseCIDsLinks: Completed traversal. Links to fetch: {}, Visited: {}",
                       links_to_fetch.size(),
                       visited.size() );

        return { std::move( links_to_fetch ), std::move( visited ) };
    }

    outcome::result<void> GraphsyncDAGSyncer::markResolved( const CID &cid )
    {
        std::lock_guard<std::mutex> lock( dagMutex_ );
        return dagService_.markResolved( cid );
    }

    outcome::result<bool> GraphsyncDAGSyncer::isResolved( const CID &cid ) const
    {
        std::lock_guard<std::mutex> lock( dagMutex_ );
        return dagService_.isResolved( cid );
    }

    void GraphsyncDAGSyncer::InitCIDBlock( const CID &cid )
    {
        lru_cid_cache_.init( cid );
        logger_->debug( "Block initialized without content to LRU cache: CID {}, cache size: {}",
                        cid.toString().value(),
                        lru_cid_cache_.size() );
    }

    bool GraphsyncDAGSyncer::AddCIDBlock( const CID &cid, const std::shared_ptr<ipfs_lite::ipld::IPLDNode> &block )
    {
        bool was_created = lru_cid_cache_.add( cid, block );

        if ( was_created )
        {
            logger_->debug( "New block added to LRU cache: CID {}, cache size: {}",
                            cid.toString().value(),
                            lru_cid_cache_.size() );
        }
        else
        {
            logger_->debug( "Existing block updated in LRU cache: CID {}", cid.toString().value() );
        }

        return was_created;
    }

    bool GraphsyncDAGSyncer::IsCIDInCache( const CID &cid ) const
    {
        return lru_cid_cache_.contains( cid );
    }

    outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> GraphsyncDAGSyncer::GrabCIDBlock( const CID &cid ) const
    {
        if ( lru_cid_cache_.hasContent( cid ) )
        {
            if ( auto node = lru_cid_cache_.get( cid ) )
            {
                logger_->trace( "Block retrieved from LRU cache: CID {}", cid.toString().value() );
                return node;
            }
        }
        return outcome::failure( Error::CID_NOT_FOUND );
    }

    outcome::result<void> GraphsyncDAGSyncer::DeleteCIDBlock( const CID &cid )
    {
        if ( lru_cid_cache_.remove( cid ) )
        {
            logger_->debug( "Block removed from LRU cache: CID {}", cid.toString().value() );
        }
        return outcome::success();
    }

    void GraphsyncDAGSyncer::AddToBlackList( const PeerId &peer ) const
    {
        std::lock_guard lock( blacklist_mutex_ );

        uint64_t now = GetCurrentTimestamp();

        if ( auto [it, inserted] = blacklist_.emplace( peer.toMultihash(), BlacklistEntry( now, 1 ) ); !inserted )
        {
            BlacklistEntry &entry   = it->second;
            uint64_t        timeout = getBackoffTimeout( entry.failures, entry.ever_connected );

            if ( now - entry.timestamp > timeout )
            {
                logger_->debug( "Peer {} blacklist timeout expired", peer.toBase58() );
            }

            entry.failures++;
            logger_->debug( "Peer {} failures incremented to {}", peer.toBase58(), entry.failures );

            entry.timestamp = now;
        }
    }

    uint64_t GraphsyncDAGSyncer::getBackoffTimeout( uint64_t failures, bool ever_connected )
    {
        if ( ever_connected )
        {
            // For previously connected peers:
            // - Start with 5 seconds
            // - Cap at 30 seconds
            uint64_t base_seconds = 5;
            uint64_t max_seconds  = 30;

            // Calculate exponential backoff
            uint64_t timeout = base_seconds * ( 1ULL << failures );
            return std::min( timeout, max_seconds );
        }
        // For never-connected peers:
        // - Start with 10 seconds
        // - Cap at 1800 seconds (30 minutes)
        uint64_t base_seconds = 10;
        uint64_t max_seconds  = 1800;

        // Calculate exponential backoff
        uint64_t timeout = base_seconds * ( 1ULL << failures );
        return std::min( timeout, max_seconds );
    }

    bool GraphsyncDAGSyncer::IsOnBlackList( const PeerId &peer ) const
    {
        bool            ret = false;
        std::lock_guard lock( blacklist_mutex_ );
        do
        {
            auto it = blacklist_.find( peer.toMultihash() );
            if ( it == blacklist_.end() )
            {
                logger_->trace( "Peer {} in NOT blacklisted", peer.toBase58() );
                break;
            }

            uint64_t        now   = GetCurrentTimestamp();
            BlacklistEntry &entry = it->second;

            // If no failures yet, not blacklisted
            if ( entry.failures == 0 )
            {
                break;
            }

            // Calculate timeout based on connection history and failure count
            uint64_t timeout = getBackoffTimeout( entry.failures - 1, entry.ever_connected );

            if ( now - entry.timestamp > timeout )
            {
                // Timeout expired, so peer is NOT currently blacklisted
                // We don't reset failures, but we do allow this peer to be tried again
                entry.backoff_attempts++; // Track the number of times we've retried this peer
                entry.timestamp = now;
                logger_->trace( "Peer {} blacklist timeout expired, allowing retry (attempt {}, failures {})",
                                peer.toBase58(),
                                entry.backoff_attempts,
                                entry.failures );
                // ret remains false - peer is NOT on blacklist
                break;
            }

            // Still within blacklist timeout and has failures
            ret = true; // This peer IS on the blacklist
            logger_->trace( "Peer {} BLACKLISTED (failures: {}, timeout: {}s)",
                            peer.toBase58(),
                            entry.failures,
                            timeout );
        } while ( false );

        return ret;
    }

    // Record successful connections
    void GraphsyncDAGSyncer::RecordSuccessfulConnection( const PeerId &peer )
    {
        std::lock_guard lock( blacklist_mutex_ );
        if ( auto it = blacklist_.find( peer.toMultihash() ); it != blacklist_.end() )
        {
            BlacklistEntry &entry  = it->second;
            entry.ever_connected   = true;
            entry.failures         = 0;
            entry.backoff_attempts = 0; // Reset backoff on successful connection
            logger_->debug( "Recorded successful connection for peer {}", peer.toBase58() );
        }
    }

    outcome::result<void> GraphsyncDAGSyncer::BlackListPeer( const PeerId &peer ) const
    {
        AddToBlackList( peer );
        if ( IsOnBlackList( peer ) )
        {
            EraseRoutesFromPeerID( peer );
        }
        return outcome::success();
    }

    outcome::result<std::shared_ptr<ipfs_lite::ipld::IPLDNode>> GraphsyncDAGSyncer::GetNodeFromMerkleDAG(
        const CID &cid ) const
    {
        std::lock_guard lock( dagMutex_ );
        return dagService_.getNode( cid );
    }

    outcome::result<GraphsyncDAGSyncer::PeerEntry> GraphsyncDAGSyncer::GetRoute( const CID &cid ) const
    {
        PeerKey peerKey;
        // First find the peer key in the routing table
        {
            std::lock_guard lock( routing_mutex_ );
            auto            it = routing_.find( cid );
            if ( it == routing_.end() )
            {
                logger_->error( "No route for the requested CID  {}", cid.toString().value() );
                return outcome::failure( Error::ROUTE_NOT_FOUND );
            }

            peerKey = it->second;
        }
        // Now get the actual peer entry from the registry
        return GetPeerById( peerKey );
    }

    void GraphsyncDAGSyncer::EraseRoutesFromPeerID( const PeerId &peer ) const
    {
        // First find the peer key in the peer index
        PeerKey peerKeyToRemove;

        {
            std::lock_guard registry_lock( registry_mutex_ );
            auto            it = peer_index_.find( peer );
            if ( it == peer_index_.end() )
            {
                // Peer not found in registry, nothing to erase
                return;
            }
            peerKeyToRemove = it->second;
        }

        // Remove all routes that point to this peer
        std::lock_guard routing_lock( routing_mutex_ );
        for ( auto it = routing_.begin(); it != routing_.end(); )
        {
            if ( it->second == peerKeyToRemove )
            {
                logger_->debug( "Erasing route for CID {} to blacklisted peer", it->first.toString().value() );
                it = routing_.erase( it );
            }
            else
            {
                ++it;
            }
        }
    }

    void GraphsyncDAGSyncer::EraseRoute( const CID &cid )
    {
        std::lock_guard lock( routing_mutex_ );
        if ( auto it = routing_.find( cid ); it != routing_.end() )
        {
            routing_.erase( it );
        }
    }

    uint64_t GraphsyncDAGSyncer::GetCurrentTimestamp()
    {
        return static_cast<uint64_t>(
            std::chrono::duration_cast<std::chrono::seconds>( std::chrono::system_clock::now().time_since_epoch() )
                .count() );
    }

    void GraphsyncDAGSyncer::RecordCIDFailure( const PeerId &peer, const CID &cid ) const
    {
        std::lock_guard<std::mutex> lock( cid_failures_mutex_ );
        auto                        key = std::make_pair( peer.toMultihash(), cid );
        cid_failures_[key]              = GetCurrentTimestamp();
        logger_->debug( "Recorded CID failure: peer {} cannot provide CID {}",
                        peer.toBase58(),
                        cid.toString().value() );
    }

    bool GraphsyncDAGSyncer::HasRecentCIDFailure( const PeerId &peer, const CID &cid ) const
    {
        std::lock_guard<std::mutex> lock( cid_failures_mutex_ );
        auto                        key = std::make_pair( peer.toMultihash(), cid );
        auto                        it  = cid_failures_.find( key );

        if ( it == cid_failures_.end() )
        {
            return false; // No failure recorded
        }

        // Consider failure "recent" for 5 minutes (300 seconds)
        // This prevents immediate re-requests but allows retry after some time
        uint64_t       now             = GetCurrentTimestamp();
        uint64_t       failure_age     = now - it->second;
        const uint64_t FAILURE_TIMEOUT = 300; // 5 minutes

        if ( failure_age > FAILURE_TIMEOUT )
        {
            // Failure is old, remove it and allow retry
            cid_failures_.erase( it );
            return false;
        }

        return true; // Recent failure exists
    }

    void GraphsyncDAGSyncer::ClearCIDFailure( const PeerId &peer, const CID &cid ) const
    {
        std::lock_guard<std::mutex> lock( cid_failures_mutex_ );
        auto                        key = std::make_pair( peer.toMultihash(), cid );
        auto                        it  = cid_failures_.find( key );
        if ( it != cid_failures_.end() )
        {
            cid_failures_.erase( it );
            logger_->debug( "Cleared CID failure record: peer {} can now be tried again for CID {}",
                            peer.toBase58(),
                            cid.toString().value() );
        }
    }

    void GraphsyncDAGSyncer::LRUCIDCache::init( const CID &cid )
    {
        std::lock_guard lock( mutex_ );
        // Check if the item already exists
        if ( auto it = cache_map_.find( cid ); it != cache_map_.end() )
        {
            // Already exists, just update its position in LRU list
            lru_list_.erase( it->second.second );
            lru_list_.push_front( cid );
            it->second.second = lru_list_.begin();
            return;
        }

        // If cache is full, remove the least recently used item
        if ( cache_map_.size() >= MAX_CACHE_SIZE )
        {
            // Get the least recently used CID
            const CID &lru_cid = lru_list_.back();
            // Remove it from the cache
            cache_map_.erase( lru_cid );
            // Remove it from the LRU list
            lru_list_.pop_back();
        }

        // Add the new item to the front of the LRU list
        lru_list_.push_front( cid );
        // Add to the cache map with nullptr and reference to its position in the LRU list
        cache_map_[cid] = std::make_pair( nullptr, lru_list_.begin() );
    }

    bool GraphsyncDAGSyncer::LRUCIDCache::add( const CID &cid, std::shared_ptr<ipfs_lite::ipld::IPLDNode> node )
    {
        std::lock_guard lock( mutex_ );
        // Check if the item already exists
        if ( auto it = cache_map_.find( cid ); it != cache_map_.end() )
        {
            // Existing entry - update it
            // Move the CID to the front of the LRU list
            lru_list_.erase( it->second.second );
            lru_list_.push_front( cid );
            // Update the cache item with new node and new iterator
            it->second = std::make_pair( node, lru_list_.begin() );
            return false; // Entry was updated, not created
        }

        // If cache is full, remove the least recently used item
        if ( cache_map_.size() >= MAX_CACHE_SIZE )
        {
            // Get the least recently used CID
            const CID &lru_cid = lru_list_.back();
            // Remove it from the cache
            cache_map_.erase( lru_cid );
            // Remove it from the LRU list
            lru_list_.pop_back();
        }

        // Add the new item to the front of the LRU list
        lru_list_.push_front( cid );
        // Add to the cache map with a reference to its position in the LRU list
        cache_map_[cid] = std::make_pair( std::move( node ), lru_list_.begin() );
        return true; // New entry was created
    }

    std::shared_ptr<ipfs_lite::ipld::IPLDNode> GraphsyncDAGSyncer::LRUCIDCache::get( const CID &cid )
    {
        std::lock_guard lock( mutex_ );
        auto            it = cache_map_.find( cid );
        if ( it == cache_map_.end() )
        {
            return nullptr;
        }

        // Move this item to the front of the LRU list
        lru_list_.erase( it->second.second );
        lru_list_.push_front( cid );

        // Update the iterator in the cache map
        it->second.second = lru_list_.begin();

        // Return the node
        return it->second.first;
    }

    bool GraphsyncDAGSyncer::LRUCIDCache::remove( const CID &cid )
    {
        std::lock_guard lock( mutex_ );
        auto            it = cache_map_.find( cid );
        if ( it == cache_map_.end() )
        {
            return false;
        }

        // Remove from LRU list
        lru_list_.erase( it->second.second );

        // Remove from cache map
        cache_map_.erase( it );

        return true;
    }

    bool GraphsyncDAGSyncer::LRUCIDCache::contains( const CID &cid ) const
    {
        std::lock_guard lock( mutex_ );
        return cache_map_.find( cid ) != cache_map_.end();
    }

    // Check if CID exists and has content
    bool GraphsyncDAGSyncer::LRUCIDCache::hasContent( const CID &cid ) const
    {
        std::lock_guard lock( mutex_ );
        auto            it = cache_map_.find( cid );
        return it != cache_map_.end() && it->second.first != nullptr;
    }

    void GraphsyncDAGSyncer::Stop()
    {
        logger_->debug( "Stopping Dagsyncer" );
        is_stopped_ = true;
        graphsync_->stop();
    }

}

Updated on 2026-03-04 at 13:10:44 -0800