Skip to content

src/crdt/impl/crdt_datastore.cpp

Namespaces

Name
sgns
sgns::crdt

Types

Name
using pb::CRDTBroadcast CRDTBroadcast

Functions

Name
OUTCOME_CPP_DEFINE_CATEGORY_3(sgns::crdt , CrdtDatastore::Error , e )

Types Documentation

using CRDTBroadcast

using sgns::crdt::CRDTBroadcast = pb::CRDTBroadcast;

Functions Documentation

function OUTCOME_CPP_DEFINE_CATEGORY_3

OUTCOME_CPP_DEFINE_CATEGORY_3(
    sgns::crdt ,
    CrdtDatastore::Error ,
    e 
)

Source code

#include <fmt/std.h>
#include "crdt/crdt_datastore.hpp"
#include "crdt/graphsync_dagsyncer.hpp"
#include <storage/rocksdb/rocksdb.hpp>
#include <iostream>
#include "crdt/proto/bcast.pb.h"
#include "storage/database_error.hpp"
#include <google/protobuf/unknown_field_set.h>
#include <ipfs_lite/ipld/impl/ipld_node_impl.hpp>
#include <thread>
#include <utility>
#include <boost/format.hpp>

OUTCOME_CPP_DEFINE_CATEGORY_3( sgns::crdt, CrdtDatastore::Error, e )
{
    using CrdtDatastoreErr = sgns::crdt::CrdtDatastore::Error;
    switch ( e )
    {
        case CrdtDatastoreErr::INVALID_PARAM:
            return "Invalid parameter";
        case CrdtDatastoreErr::FETCH_ROOT_NODE:
            return "Can't fetch the root node";
        case CrdtDatastoreErr::NODE_DESERIALIZATION:
            return "Can't deserialize node buffer into node";
        case CrdtDatastoreErr::FETCHING_GRAPH:
            return "Can't fetch graph";
        case CrdtDatastoreErr::NODE_CREATION:
            return "Can't create a node";
        case CrdtDatastoreErr::GET_NODE:
            return "Can't fetch the node";
        case CrdtDatastoreErr::INVALID_JOB:
            return "The job is invalid";
    }
    return "Unknown error";
}

namespace sgns::crdt
{

    using CRDTBroadcast = pb::CRDTBroadcast;

    std::shared_ptr<CrdtDatastore> CrdtDatastore::New( std::shared_ptr<RocksDB>     aDatastore,
                                                       const HierarchicalKey       &aKey,
                                                       std::shared_ptr<DAGSyncer>   aDagSyncer,
                                                       std::shared_ptr<Broadcaster> aBroadcaster,
                                                       std::shared_ptr<CrdtOptions> aOptions )
    {
        if ( ( aDatastore == nullptr ) || ( aDagSyncer == nullptr ) || ( aBroadcaster == nullptr ) )
        {
            return nullptr;
        }
        if ( ( aDatastore == nullptr ) || aOptions->Verify().has_failure() ||
             ( aOptions->Verify().value() != CrdtOptions::VerifyErrorCode::Success ) )
        {
            return nullptr;
        }
        auto crdtInstance = std::shared_ptr<CrdtDatastore>( new CrdtDatastore( std::move( aDatastore ),
                                                                               aKey,
                                                                               std::move( aDagSyncer ),
                                                                               std::move( aBroadcaster ),
                                                                               std::move( aOptions ) ) );

        crdtInstance->set_ = std::make_shared<CrdtSet>(
            crdtInstance->dataStore_,
            aKey.ChildString( std::string( setsNamespace_ ) ),
            [weakptr( std::weak_ptr<CrdtDatastore>(
                crdtInstance ) )]( const std::string &key, const base::Buffer &value, const std::string &cid )
            {
                if ( auto strong = weakptr.lock() )
                {
                    strong->PutElementsCallback( key, value, cid );
                }
            },
            [weakptr( std::weak_ptr<CrdtDatastore>( crdtInstance ) )]( const std::string &key, const std::string &cid )
            {
                if ( auto strong = weakptr.lock() )
                {
                    strong->DeleteElementsCallback( key, cid );
                }
            } );
        crdtInstance->dagWorkerJobListThreadRunning_ = true;
        crdtInstance->dagWorkers_.reserve( crdtInstance->numberOfDagWorkers );
        for ( int i = 0; i < crdtInstance->numberOfDagWorkers; ++i )
        {
            auto &dagWorker = crdtInstance->dagWorkers_.emplace_back( std::make_unique<DagWorker>() );

            dagWorker->dagWorkerThreadRunning_ = true;
            dagWorker->dagWorkerFuture_        = std::async(
                [weakptr( std::weak_ptr<CrdtDatastore>( crdtInstance ) ), &dagWorker]
                {
                    auto dagThreadRunning = true;
                    while ( dagThreadRunning )
                    {
                        if ( auto self = weakptr.lock() )
                        {
                            if ( !self->ShouldContinueWorkerThread( *dagWorker ) )
                            {
                                dagThreadRunning = false;
                                continue;
                            }

                            // Process jobs in priority order
                            if ( self->ProcessJobs( self->selfCreatedJobList_ ) )
                            {
                                continue;
                            }
                            if ( self->ProcessJobs( self->rootCIDJobList_ ) )
                            {
                                continue;
                            }
                            if ( self->SeedNextExternalRoot() )
                            {
                                continue;
                            }
                        }
                        else
                        {
                            dagThreadRunning = false;
                        }
                    }
                } );
        }
        return crdtInstance;
    }

    bool CrdtDatastore::ShouldContinueWorkerThread( DagWorker &dagWorker )
    {
        std::unique_lock lk( dagWorkerMutex_ );
        dagWorkerCv_.wait_for( lk,
                               threadSleepTimeInMilliseconds_,
                               [&]
                               {
                                   return !dagWorker.dagWorkerThreadRunning_ || !selfCreatedJobList_.empty() ||
                                          !rootCIDJobList_.empty() ||
                                          ( !activeRootCID_.has_value() && !pendingRootQueue_.empty() );
                               } );

        return dagWorker.dagWorkerThreadRunning_;
    }

    bool CrdtDatastore::ProcessJobs( std::queue<RootCIDJob> &jobs )
    {
        std::unique_lock lk( dagWorkerMutex_ );
        if ( jobs.empty() )
        {
            return false;
        }

        RootCIDJob job_to_process = jobs.front();
        jobs.pop();
        lk.unlock();

        logger_->debug( "Processing job for CID {}", job_to_process.root_node_->getCID().toString().value() );

        auto process_res = ProcessJobIteration( job_to_process );
        if ( process_res.has_failure() )
        {
            HandleJobProcessingFailure( job_to_process );
        }
        else
        {
            HandleJobProcessingSuccess( job_to_process );
        }

        return true; // Processed a job
    }

    bool CrdtDatastore::SeedNextExternalRoot()
    {
        std::unique_lock lk( dagWorkerMutex_ );
        if ( activeRootCID_.has_value() || pendingRootQueue_.empty() )
        {
            return false;
        }

        CID next = pendingRootQueue_.front();
        pendingRootQueue_.pop();
        activeRootCID_ = next;
        lk.unlock();

        logger_->debug( "Seeding new external root CID {}", next.toString().value() );
        auto res = HandleRootCIDBlock( next );
        if ( res.has_failure() )
        {
            std::unique_lock lk2( dagWorkerMutex_ );
            activeRootCID_.reset();
            dagWorkerCv_.notify_all();
        }

        return true; // Seeded a root
    }

    void CrdtDatastore::HandleJobProcessingFailure( const RootCIDJob &job )
    {
        // Signal job failure
        {
            std::lock_guard lock_jobs( dagWorkerMutex_ );
            pending_jobs_[job.root_node_->getCID()] = JobStatus::FAILED;
        }

        const std::string_view jobType = job.created_by_self_ ? "SELF-CREATED" : "EXTERNAL";
        logger_->error( "{} JOB PROCESSING ERROR: Failed to process CID {}",
                        jobType,
                        job.root_node_->getCID().toString().value() );

        CleanupFailedJob( job );

        // Delete blocks
        (void)dagSyncer_->DeleteCIDBlock( job.root_node_->getCID() );
        if ( job.node_ && job.node_->getCID() != job.root_node_->getCID() )
        {
            (void)dagSyncer_->DeleteCIDBlock( job.node_->getCID() );
        }

        if ( !job.created_by_self_ )
        {
            std::lock_guard<std::mutex> g( pendingHeadsMutex_ );
            pendingHeadsByRootCID_.erase( job.root_node_->getCID() );
        }

        dagWorkerCv_.notify_all();
    }

    void CrdtDatastore::HandleJobProcessingSuccess( const RootCIDJob &job )
    {
        {
            // Mark self-created job as completed
            std::lock_guard lock_jobs( dagWorkerMutex_ );
            pending_jobs_[job.root_node_->getCID()] = JobStatus::COMPLETED;
        }
        if ( job.created_by_self_ )
        {
            logger_->debug( "Successfully completed self-created job for CID {}",
                            job.root_node_->getCID().toString().value() );
        }
        // External jobs are handled in ProcessJobIteration when they complete
    }

    void CrdtDatastore::CleanupFailedJob( const RootCIDJob &job )
    {
        std::unique_lock lock( dagWorkerMutex_ );

        if ( job.created_by_self_ )
        {
            // Clean up self-created job queue
            std::queue<RootCIDJob> tmp;
            while ( !selfCreatedJobList_.empty() )
            {
                auto j = selfCreatedJobList_.front();
                selfCreatedJobList_.pop();
                if ( j.root_node_->getCID() != job.root_node_->getCID() )
                {
                    tmp.push( j );
                }
            }
            std::swap( selfCreatedJobList_, tmp );
        }
        else
        {
            // Clean up external job queue
            std::queue<RootCIDJob> tmp;
            while ( !rootCIDJobList_.empty() )
            {
                auto j = rootCIDJobList_.front();
                rootCIDJobList_.pop();
                if ( j.root_node_->getCID() != job.root_node_->getCID() )
                {
                    tmp.push( j );
                }
            }
            std::swap( rootCIDJobList_, tmp );

            // Reset activeRootCID for external jobs
            activeRootCID_.reset();
        }
    }

    void CrdtDatastore::Start()
    {
        if ( started_ == true )
        {
            return;
        }

        handleNextThreadRunning_ = true;
        // Starting HandleNext worker thread
        handleNextFuture_ = std::async(
            [weakptr{ weak_from_this() }]
            {
                auto threadRunning = true;
                while ( threadRunning )
                {
                    if ( auto self = weakptr.lock() )
                    {
                        self->HandleCIDBroadcast();
                        if ( !self->handleNextThreadRunning_ )
                        {
                            self->logger_->debug( "HandleNext thread finished" );
                            threadRunning = false;
                        }
                    }
                    else
                    {
                        threadRunning = false;
                    }

                    if ( threadRunning )
                    {
                        std::this_thread::sleep_for( threadSleepTimeInMilliseconds_ );
                    }
                }
            } );

        rebroadcastThreadRunning_ = true;
        // Starting Rebroadcast worker thread
        rebroadcastFuture_ = std::async(
            [weakptr{ weak_from_this() }]
            {
                auto self = weakptr.lock();
                if ( !self )
                {
                    return;
                }

                const auto interval = std::chrono::milliseconds(
                    self->options_ ? self->options_->rebroadcastIntervalMilliseconds : 100 );
                std::unique_lock lock( self->rebroadcastMutex_ );

                while ( self->rebroadcastThreadRunning_ )
                {
                    self->RebroadcastHeads();
                    self->rebroadcastCv_.wait_for( lock, interval );
                }
            } );

        started_ = true;
    }

    CrdtDatastore::CrdtDatastore( std::shared_ptr<RocksDB>     aDatastore,
                                  const HierarchicalKey       &aKey,
                                  std::shared_ptr<DAGSyncer>   aDagSyncer,
                                  std::shared_ptr<Broadcaster> aBroadcaster,
                                  std::shared_ptr<CrdtOptions> aOptions ) :
        dataStore_( std::move( aDatastore ) ),
        options_( std::move( aOptions ) ),
        namespaceKey_( aKey ),
        broadcaster_( std::move( aBroadcaster ) ),
        dagSyncer_( std::move( aDagSyncer ) ),
        crdt_filter_( true ),
        crdt_cb_manager_()
    {
        logger_            = options_->logger;
        numberOfDagWorkers = options_->numWorkers;

        heads_ = std::make_shared<CrdtHeads>( dataStore_, aKey.ChildString( std::string( headsNamespace_ ) ) );

        size_t   numberOfHeads = 0;
        uint64_t maxHeight     = 0;

        auto getListResult = heads_->GetList();
        if ( !getListResult.has_failure() )
        {
            auto [head_map, height] = getListResult.value();
            for ( const auto &[topic_name, cid_set] : head_map )
            {
                numberOfHeads += cid_set.size();
                maxHeight      = std::max( maxHeight, height );
            }
        }

        logger_->info( "crdt Datastore created. Number of heads: {} Current max-height: {}", numberOfHeads, maxHeight );
    }

    CrdtDatastore::~CrdtDatastore()
    {
        logger_->debug( "~CrdtDatastore CALLED at {} ", std::this_thread::get_id() );
        Close();
    }

    std::shared_ptr<CrdtDatastore::Delta> CrdtDatastore::DeltaMerge( const std::shared_ptr<Delta> &aDelta1,
                                                                     const std::shared_ptr<Delta> &aDelta2 )
    {
        auto result = std::make_shared<Delta>();
        if ( aDelta1 != nullptr )
        {
            for ( const auto &elem : aDelta1->elements() )
            {
                auto newElement = result->add_elements();
                newElement->CopyFrom( elem );
            }
            for ( const auto &tomb : aDelta1->tombstones() )
            {
                auto newTomb = result->add_tombstones();
                newTomb->CopyFrom( tomb );
            }
            result->set_priority( aDelta1->priority() );
        }
        if ( aDelta2 != nullptr )
        {
            for ( const auto &elem : aDelta2->elements() )
            {
                auto newElement = result->add_elements();
                newElement->CopyFrom( elem );
            }
            for ( const auto &tomb : aDelta2->tombstones() )
            {
                auto newTomb = result->add_tombstones();
                newTomb->CopyFrom( tomb );
            }
            auto d2Priority = aDelta2->priority();
            if ( d2Priority > result->priority() )
            {
                result->set_priority( d2Priority );
            }
        }
        return result;
    }

    void CrdtDatastore::Close()
    {
        dagSyncer_->Stop();
        if ( handleNextThreadRunning_ )
        {
            handleNextThreadRunning_ = false;
        }

        if ( rebroadcastThreadRunning_ )
        {
            rebroadcastThreadRunning_ = false;
            rebroadcastCv_.notify_all();
        }

        if ( dagWorkerJobListThreadRunning_ )
        {
            dagWorkerJobListThreadRunning_ = false;
            dagWorkerCv_.notify_all();
            for ( auto &dagWorker : dagWorkers_ )
            {
                dagWorker->dagWorkerThreadRunning_ = false;
                if ( dagWorker->dagWorkerFuture_.valid() )
                {
                    dagWorker->dagWorkerFuture_.wait();
                }
            }

            // Clear both job queues
            {
                std::lock_guard        lock( dagWorkerMutex_ );
                std::queue<RootCIDJob> empty1, empty2;
                std::swap( rootCIDJobList_, empty1 );
                std::swap( selfCreatedJobList_, empty2 );
                pending_jobs_.clear();
            }
        }

        if ( handleNextFuture_.valid() )
        {
            handleNextFuture_.wait();
        }

        if ( rebroadcastFuture_.valid() )
        {
            rebroadcastFuture_.wait();
        }
    }

    void CrdtDatastore::HandleCIDBroadcast()
    {
        if ( broadcaster_ == nullptr )
        {
            handleNextThreadRunning_ = false;
            return;
        }

        auto broadcasterNextResult = broadcaster_->Next();
        if ( broadcasterNextResult.has_failure() )
        {
            if ( broadcasterNextResult.error().value() !=
                 static_cast<int>( Broadcaster::ErrorCode::ErrNoMoreBroadcast ) )
            {
                // logger_->debug("Failed to get next broadcaster (error code " +
                //                std::to_string(broadcasterNextResult.error().value()) + ")");
            }
            return;
        }

        auto decodeResult = DecodeBroadcast( broadcasterNextResult.value() );
        if ( decodeResult.has_failure() )
        {
            logger_->error( "Broadcaster: Unable to decode broadcast (error code {})",
                            std::to_string( broadcasterNextResult.error().value() ) );
            return;
        }

        for ( const auto &bCastHeadCID : decodeResult.value() )
        {
            logger_->trace( "{}: Received CID {}", __func__, bCastHeadCID.toString().value() );
            auto dagSyncerResult = dagSyncer_->HasBlock( bCastHeadCID );
            if ( dagSyncerResult.has_failure() )
            {
                logger_->error( "{}: error checking for known block", __func__ );
                continue;
            }
            if ( dagSyncerResult.value() )
            {
                // cid is known. Skip walking tree
                logger_->trace( "{}: Already processed block {}", __func__, bCastHeadCID.toString().value() );
                continue;
            }

            if ( dagSyncer_->IsCIDInCache( bCastHeadCID ) )
            {
                // If the CID request was already triggered but node didn't finish processing
                bool retry_failed = false;
                {
                    std::lock_guard lock( dagWorkerMutex_ );
                    auto            it = pending_jobs_.find( bCastHeadCID );
                    if ( it != pending_jobs_.end() && it->second == JobStatus::FAILED )
                    {
                        pending_jobs_.erase( it );
                        retry_failed = true;
                    }
                }

                if ( retry_failed )
                {
                    logger_->warn( "{}: Clearing failed job for CID {}, allowing retry",
                                   __func__,
                                   bCastHeadCID.toString().value() );
                    (void)dagSyncer_->DeleteCIDBlock( bCastHeadCID );
                }
                else
                {
                    logger_->trace( "{}: Processing block {} on graphsync", __func__, bCastHeadCID.toString().value() );
                    continue;
                }
            }

            if ( IsRootCIDPendingOrActive( bCastHeadCID ) )
            {
                logger_->trace( "{}: Root CID {} already pending/active", __func__, bCastHeadCID.toString().value() );
                continue;
            }

            if ( EnqueueRootCID( bCastHeadCID ) )
            {
                logger_->debug( "{}: Queueing processing for block {}", __func__, bCastHeadCID.toString().value() );
                dagWorkerCv_.notify_one(); // wake a worker to possibly seed the next root
            }
            else
            {
                logger_->trace( "{}: Root CID {} could not be enqueued (already pending)",
                                __func__,
                                bCastHeadCID.toString().value() );
            }
        }
    }

    outcome::result<void> CrdtDatastore::HandleRootCIDBlock( const CID &aCid )
    {
        auto root_job_result = CreateRootJob( aCid );
        if ( root_job_result.has_failure() )
        {
            MarkJobFailed( aCid );
            return root_job_result.as_failure();
        }

        auto links_result = GetLinksToFetch( root_job_result.value() );
        if ( links_result.has_failure() )
        {
            MarkJobFailed( aCid );
            return links_result.as_failure();
        }

        auto fetch_result = FetchNodes( root_job_result.value(), links_result.value() );
        if ( fetch_result.has_failure() )
        {
            MarkJobFailed( aCid );
            return fetch_result.as_failure();
        }
        return outcome::success();
    }

    outcome::result<CrdtDatastore::RootCIDJob> CrdtDatastore::CreateRootJob( const CID &aRootCID )
    {
        logger_->debug( "{}: Creating the Root Job for CID {}", __func__, aRootCID.toString().value() );
        dagSyncer_->InitCIDBlock( aRootCID );
        OUTCOME_TRY( auto &&root_node, dagSyncer_->getNode( aRootCID ) );

        logger_->debug( "{}: Root Job created for CID {}", __func__, aRootCID.toString().value() );

        RootCIDJob rootJob{ root_node, root_node, false };

        return rootJob;
    }

    outcome::result<std::set<CID>> CrdtDatastore::GetLinksToFetch( const RootCIDJob &job )
    {
        std::set<CID> cids_to_fetch;
        auto          node_to_process = job.node_;
        if ( node_to_process == nullptr )
        {
            node_to_process = job.root_node_;
        }

        std::set<std::string> topics_to_update_cid = node_to_process->getDestinations();

        for ( auto &topic : topics_to_update_cid )
        {
            logger_->debug( "{}: Recording head to add: {}, {}",
                            __func__,
                            job.root_node_->getCID().toString().value(),
                            topic );
            std::lock_guard<std::mutex> lock( pendingHeadsMutex_ );
            pendingHeadsByRootCID_[job.root_node_->getCID()].emplace( job.root_node_->getCID(), topic );
        }
        if ( !node_to_process->getLinks().empty() )
        {
            logger_->debug( "{}: Checking links for CID {}", __func__, node_to_process->getCID().toString().value() );
            for ( auto &topic : topics_to_update_cid )
            {
                logger_->trace( "{}: Verifying topic {}", __func__, topic );

                auto [links_to_fetch, known_cids] = dagSyncer_->TraverseCIDsLinks( *node_to_process, topic, {} );

                for ( const auto &[cid, _dontcare] : known_cids )
                {
                    if ( logger_->level() <= spdlog::level::trace )
                    {
                        logger_->trace( "{}: known cid: {}, {}", __func__, cid.toString().value(), _dontcare );
                    }
                    if ( heads_->IsHead( cid, _dontcare ) )
                    {
                        if ( logger_->level() <= spdlog::level::debug )
                        {
                            logger_->debug( "{}: Recording replacement of {} with {} on topic {} ({}) ",
                                            __func__,
                                            cid.toString().value(),
                                            job.root_node_->getCID().toString().value(),
                                            topic,
                                            _dontcare );
                        }
                        if ( topic != _dontcare )
                        {
                            logger_->error( "{}: Topic {} different from known {} ", __func__, topic, _dontcare );
                        }
                        std::lock_guard<std::mutex> lock( pendingHeadsMutex_ );
                        pendingHeadsByRootCID_[job.root_node_->getCID()].emplace( cid, topic );
                        logger_->debug( "{}: Recorded replacement of {} with {} on topic {} ({}) ",
                                        __func__,
                                        cid.toString().value(),
                                        job.root_node_->getCID().toString().value(),
                                        topic,
                                        _dontcare );
                    }
                }

                if ( known_cids.empty() )
                {
                    std::lock_guard<std::mutex> lock( pendingHeadsMutex_ );
                    pendingHeadsByRootCID_[job.root_node_->getCID()].emplace( job.root_node_->getCID(), topic );
                }

                for ( const auto &[cid, link_name] : links_to_fetch )
                {
                    logger_->debug( "{}: Not known cid: {}, {}", __func__, cid.toString().value(), link_name );
                    if ( topicNames_.find( link_name ) != topicNames_.end() )
                    {
                        cids_to_fetch.emplace( cid );
                    }
                }
            }
        }
        return cids_to_fetch;
    }

    outcome::result<void> CrdtDatastore::FetchNodes( const RootCIDJob &aRootJob, const std::set<CID> &aLinks )
    {
        if ( aLinks.empty() )
        {
            logger_->debug( "{}: No links to fetch, sending root CID", __func__ );
            {
                RootCIDJob       root_node_only_job{ nullptr, aRootJob.root_node_, aRootJob.created_by_self_ };
                std::unique_lock lock( dagWorkerMutex_ );
                rootCIDJobList_.push( root_node_only_job );
            }
            dagWorkerCv_.notify_one();
            return outcome::success();
        }

        for ( const auto &cid : aLinks )
        {
            if ( logger_->level() <= spdlog::level::debug )
            {
                logger_->debug( "{}: Trying to fetch node {} from Root Job {} ",
                                __func__,
                                cid.toString().value(),
                                aRootJob.root_node_->getCID().toString().value() );
            }

            dagSyncer_->InitCIDBlock( cid );
            OUTCOME_TRY( auto &&node, dagSyncer_->getNode( cid ) );

            RootCIDJob newRootJob;

            newRootJob.root_node_       = aRootJob.root_node_;
            newRootJob.node_            = node;
            newRootJob.created_by_self_ = false;

            if ( logger_->level() <= spdlog::level::debug )
            {
                logger_->debug( "{}: Got the node {} sending to workers. Root Job {} ",
                                __func__,
                                cid.toString().value(),
                                aRootJob.root_node_->getCID().toString().value() );
            }
            {
                std::unique_lock lock( dagWorkerMutex_ );
                rootCIDJobList_.push( newRootJob );
            }
            dagWorkerCv_.notify_one();
        }
        return outcome::success();
    }

    outcome::result<pb::Delta> CrdtDatastore::GetDeltaFromNode( const IPLDNode &aNode, bool created_by_self )
    {
        auto nodeBuffer = aNode.content();

        auto delta = Delta();
        if ( !delta.ParseFromArray( nodeBuffer.data(), nodeBuffer.size() ) )
        {
            logger_->debug( "{}: Can't parse delta from node buffer {}", __func__, aNode.getCID().toString().value() );
            return CrdtDatastore::Error::NODE_DESERIALIZATION;
        }

        if ( !created_by_self )
        {
            crdt_filter_.FilterElementsOnDelta( delta );
            //crdt_filter_.FilterTombstonesOnDelta( aDelta );
            logger_->debug( "{}: Filtering node {} ", __func__, aNode.getCID().toString().value() );
        }
        else
        {
            logger_->debug( "{}: Posting node {} without filtering", __func__, aNode.getCID().toString().value() );
        }
        return delta;
    }

    outcome::result<void> CrdtDatastore::MergeDataFromDelta( const CID &node_cid, const Delta &aDelta )
    {
        OUTCOME_TRY( auto &&cid_string, node_cid.toString() );
        logger_->debug( "{}: Merging node {} On CRDT", __func__, cid_string );
        OUTCOME_TRY( set_->Merge( aDelta, cid_string ) );
        return outcome::success();
    }

    outcome::result<void> CrdtDatastore::ProcessJobIteration( const RootCIDJob &job_to_process )
    {
        logger_->debug( "{}: Starting to process Root CID", __func__ );

        OUTCOME_TRY( auto &&root_cid_string, job_to_process.root_node_->getCID().toString() );
        logger_->debug( "{}: Processing Root CID job {}", __func__, root_cid_string );

        auto node_to_process = job_to_process.node_;
        bool is_root         = false;
        if ( node_to_process == nullptr )
        {
            node_to_process = job_to_process.root_node_;
            is_root         = true;
        }

        OUTCOME_TRY( auto &&cid_string, node_to_process->getCID().toString() );

        OUTCOME_TRY( auto &&delta, GetDeltaFromNode( *node_to_process, job_to_process.created_by_self_ ) );

        logger_->debug( "{}: Merging Deltas from {}", __func__, cid_string );

        OUTCOME_TRY( MergeDataFromDelta( node_to_process->getCID(), delta ) );

        logger_->debug( "{}: Recording block on DAG Syncher {}", __func__, cid_string );
        OUTCOME_TRY( dagSyncer_->addNode( node_to_process ) );

        (void)dagSyncer_->DeleteCIDBlock( node_to_process->getCID() );

        OUTCOME_TRY( auto &&links, GetLinksToFetch( job_to_process ) );
        const bool should_fetch_links = !job_to_process.created_by_self_ && !links.empty();

        if ( links.empty() && !is_root )
        {
            //create one last job to finalize the root node
            logger_->debug( "{}: Finishing root job: {}, Creating the root CID job.", __func__, root_cid_string );
            RootCIDJob root_final_job{ nullptr, job_to_process.root_node_, job_to_process.created_by_self_ };
            {
                std::unique_lock lock( dagWorkerMutex_ );
                rootCIDJobList_.push( root_final_job );
            }
        }
        else if ( should_fetch_links )
        {
            logger_->debug( "{}: Fetching {} links for Root job: {}", __func__, links.size(), root_cid_string );
            OUTCOME_TRY( FetchNodes( job_to_process, links ) );
            logger_->debug( "{}: Nodes fetched for Root job: {}", __func__, root_cid_string );
        }
        else if ( is_root )
        {
            if ( job_to_process.created_by_self_ && !links.empty() )
            {
                logger_->error( "{}: Self-created job {}, skipping fetch of {} links and finalizing heads",
                                __func__,
                                root_cid_string,
                                links.size() );
            }
            logger_->debug( "{}: Root finalized: {}, Updating CRDT Heads", __func__, root_cid_string );
            UpdateCRDTHeads( job_to_process.root_node_->getCID(),
                             delta.priority(),
                             job_to_process.created_by_self_ || has_full_node_topic_ );
            {
                std::unique_lock lk( dagWorkerMutex_ );
                activeRootCID_.reset(); // this root fully done
            }
            dagWorkerCv_.notify_all(); // let one worker seed the next root

            // Signal job completion after UpdateCRDTHeads is done
            {
                std::lock_guard lock( dagWorkerMutex_ );
                auto            it = pending_jobs_.find( job_to_process.root_node_->getCID() );
                if ( it != pending_jobs_.end() )
                {
                    it->second = JobStatus::COMPLETED;
                }
            }
        }

        return outcome::success();
    }

    outcome::result<std::vector<CID>> CrdtDatastore::DecodeBroadcast( const Buffer &buff )
    {
        CRDTBroadcast bcastData;
        auto          string_data = std::string( buff.toString() );

        if ( !string_data.size() )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        if ( !bcastData.MergeFromString( string_data ) )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        if ( !bcastData.IsInitialized() )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto msgReflect = bcastData.GetReflection();

        if ( msgReflect == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        if ( !msgReflect->GetUnknownFields( bcastData ).empty() )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        std::vector<CID> bCastHeads;
        for ( const auto &head : bcastData.heads() )
        {
            auto cidResult = CID::fromString( head.cid() );
            if ( cidResult.has_failure() )
            {
                return outcome::failure( boost::system::error_code{} );
            }

            bCastHeads.push_back( cidResult.value() );
        }
        return bCastHeads;
    }

    outcome::result<CrdtDatastore::Buffer> CrdtDatastore::EncodeBroadcast( const std::set<CID> &heads )
    {
        CRDTBroadcast bcastData;

        for ( const auto &head : heads )
        {
            auto encodedHead = bcastData.add_heads();

            // Check cache first to avoid expensive base58 encoding
            std::string cid_string;
            {
                std::lock_guard<std::mutex> lock( cid_string_cache_mutex_ );
                auto                        it = cid_string_cache_.find( head );
                if ( it != cid_string_cache_.end() )
                {
                    cid_string = it->second;
                    logger_->debug( "CID string cache hit for CID {}", cid_string );
                }
            }

            // Cache miss - compute and cache the string
            if ( cid_string.empty() )
            {
                auto strHeadResult = head.toString();
                if ( !strHeadResult.has_failure() )
                {
                    cid_string = strHeadResult.value();
                    std::lock_guard<std::mutex> lock( cid_string_cache_mutex_ );
                    cid_string_cache_[head] = cid_string;
                    logger_->debug( "CID string cache miss - cached CID {}", cid_string );
                }
                else
                {
                    continue; // Skip this CID if conversion fails
                }
            }

            encodedHead->set_cid( cid_string );
        }

        Buffer outputBuffer;
        outputBuffer.put( bcastData.SerializeAsString() );
        return outputBuffer;
    }

    outcome::result<CrdtDatastore::Buffer> CrdtDatastore::EncodeBroadcastStatic( const std::set<CID> &heads )
    {
        CRDTBroadcast bcastData;
        for ( const auto &head : heads )
        {
            auto encodedHead   = bcastData.add_heads();
            auto strHeadResult = head.toString();
            if ( !strHeadResult.has_failure() )
            {
                encodedHead->set_cid( strHeadResult.value() );
            }
        }

        Buffer outputBuffer;
        outputBuffer.put( bcastData.SerializeAsString() );
        return outputBuffer;
    }

    void CrdtDatastore::RebroadcastHeads()
    {
        std::unordered_set<std::string> pending_topics;
        {
            std::lock_guard lock( pendingBroadcastMutex_ );
            pending_topics = pendingBroadcastTopics_;
        }

        std::unordered_set<std::string> topics_to_broadcast = GetTopicNames();
        topics_to_broadcast.insert( pending_topics.begin(), pending_topics.end() );

        if ( topics_to_broadcast.empty() )
        {
            return;
        }

        auto getListResult = heads_->GetList( topics_to_broadcast );
        if ( getListResult.has_failure() )
        {
            logger_->error( "RebroadcastHeads: Failed to get list of heads (error code {})", getListResult.error() );
            return;
        }
        auto [head_map, maxHeight] = getListResult.value();

        // Get PeerInfo once before the loop to avoid repeated calls
        boost::optional<libp2p::peer::PeerInfo> peerInfo;
        if ( broadcaster_ )
        {
            // Cast the broadcaster's DAG syncer to GraphsyncDAGSyncer to get PeerInfo
            auto dagSyncerPtr = std::static_pointer_cast<GraphsyncDAGSyncer>( broadcaster_->GetDagSyncer() );
            if ( dagSyncerPtr )
            {
                auto peerInfoResult = dagSyncerPtr->GetPeerInfo();
                if ( peerInfoResult.has_value() )
                {
                    peerInfo = peerInfoResult.value();
                }
                else
                {
                    logger_->warn( "RebroadcastHeads: Failed to get peer info, broadcasts will retry per-call" );
                }
            }
        }

        for ( const auto &[topic_name, cid_set] : head_map ) // Changed from cid_map to head_map
        {
            auto broadcastResult = Broadcast( cid_set, topic_name, peerInfo );
            if ( broadcastResult.has_failure() )
            {
                logger_->error( "RebroadcastHeads: Broadcast failed" );
            }
            else
            {
                logger_->trace( "RebroadcastHeads: Broadcasted CIDs to topic {} ", topic_name );
                for ( const auto &cid : cid_set )
                {
                    if ( logger_->level() == spdlog::level::trace )
                    {
                        logger_->trace( "RebroadcastHeads: CID {} ", cid.toString().value() );
                    }
                }
            }
        }

        if ( !pending_topics.empty() )
        {
            std::lock_guard<std::mutex> lock( pendingBroadcastMutex_ );
            for ( const auto &topic : pending_topics )
            {
                pendingBroadcastTopics_.erase( topic );
            }
        }
    }

    outcome::result<void> CrdtDatastore::BroadcastHeadsForTopics( const std::set<std::string> &topics )
    {
        if ( topics.empty() )
        {
            logger_->debug( "BroadcastHeadsForTopics: No topics requested" );
            return outcome::success();
        }

        auto head_list_result = heads_->GetList();
        if ( head_list_result.has_error() )
        {
            logger_->error( "BroadcastHeadsForTopics: Failed to get head list" );
            return outcome::failure( head_list_result.error() );
        }

        auto [head_map, maxHeight] = head_list_result.value();

        // Get PeerInfo once to reuse across broadcasts
        boost::optional<libp2p::peer::PeerInfo> peerInfo;
        {
            auto dagSyncerPtr = std::static_pointer_cast<GraphsyncDAGSyncer>( broadcaster_->GetDagSyncer() );
            if ( dagSyncerPtr )
            {
                auto peerInfoResult = dagSyncerPtr->GetPeerInfo();
                if ( peerInfoResult.has_value() )
                {
                    peerInfo = peerInfoResult.value();
                }
                else
                {
                    logger_->warn( "BroadcastHeadsForTopics: Failed to get peer info, broadcasts will retry per-call" );
                }
            }
        }

        // Broadcast heads for each requested topic
        for ( const auto &topic_name : topics )
        {
            auto it = head_map.find( topic_name );
            if ( it == head_map.end() || it->second.empty() )
            {
                logger_->debug( "BroadcastHeadsForTopics: No heads to broadcast for topic {}", topic_name );
                continue;
            }

            auto broadcastResult = Broadcast( it->second, topic_name, peerInfo );
            if ( broadcastResult.has_failure() )
            {
                logger_->error( "BroadcastHeadsForTopics: Broadcast failed for topic {}", topic_name );
            }
            else
            {
                logger_->debug( "BroadcastHeadsForTopics: Broadcasted {} heads for topic {}",
                                it->second.size(),
                                topic_name );
            }
        }

        return outcome::success();
    }

    outcome::result<CrdtDatastore::Buffer> CrdtDatastore::GetKey( const HierarchicalKey &aKey ) const
    {
        return set_->GetElement( aKey.GetKey() );
    }

    std::string CrdtDatastore::GetKeysPrefix() const
    {
        return set_->KeysKey( "" ).GetKey();
    }

    std::string CrdtDatastore::GetValueSuffix()
    {
        return '/' + CrdtSet::GetValueSuffix();
    }

    outcome::result<CrdtDatastore::QueryResult> CrdtDatastore::QueryKeyValues( std::string_view aPrefix ) const
    {
        return set_->QueryElements( aPrefix, CrdtSet::QuerySuffix::QUERY_VALUESUFFIX );
    }

    outcome::result<CrdtDatastore::QueryResult> CrdtDatastore::QueryKeyValues(
        const std::string &prefix_base,
        const std::string &middle_part,
        const std::string &remainder_prefix ) const
    {
        if ( set_ == nullptr )
        {
            return outcome::failure( storage::DatabaseError::UNITIALIZED );
        }
        return set_->QueryElements( prefix_base,
                                    middle_part,
                                    remainder_prefix,
                                    CrdtSet::QuerySuffix::QUERY_VALUESUFFIX );
    }

    outcome::result<bool> CrdtDatastore::HasKey( const HierarchicalKey &aKey ) const
    {
        return set_->IsValueInSet( aKey.GetKey() );
    }

    outcome::result<CID> CrdtDatastore::PutKey( const HierarchicalKey                 &aKey,
                                                const Buffer                          &aValue,
                                                const std::unordered_set<std::string> &topics )
    {
        auto deltaResult = CreateDeltaToAdd( aKey.GetKey(), std::string( aValue.toString() ) );
        if ( deltaResult.has_failure() )
        {
            return outcome::failure( deltaResult.error() );
        }

        return Publish( deltaResult.value(), topics );
    }

    outcome::result<CID> CrdtDatastore::DeleteKey( const HierarchicalKey                 &aKey,
                                                   const std::unordered_set<std::string> &topics )
    {
        auto deltaResult = CreateDeltaToRemove( aKey.GetKey() );
        if ( deltaResult.has_failure() )
        {
            return outcome::failure( deltaResult.error() );
        }

        if ( deltaResult.value()->tombstones().empty() )
        {
            return outcome::success();
        }

        return Publish( deltaResult.value(), topics );
    }

    outcome::result<CID> CrdtDatastore::Publish( const std::shared_ptr<Delta>          &aDelta,
                                                 const std::unordered_set<std::string> &topics )
    {
        OUTCOME_TRY( auto &&node, CreateDAGNode( aDelta, topics ) );
        OUTCOME_TRY( auto &&newCID, AddDAGNode( node ) );
        return newCID;
    }

    outcome::result<void> CrdtDatastore::Broadcast( const std::set<CID>                    &cids,
                                                    const std::string                      &topic,
                                                    boost::optional<libp2p::peer::PeerInfo> peerInfo )
    {
        if ( !broadcaster_ )
        {
            logger_->error( "Broadcast: No broadcaster, Failed to broadcast" );
            return outcome::failure( boost::system::error_code{} );
        }
        if ( cids.empty() )
        {
            logger_->error( "Broadcast: Cids Empty, Failed to broadcast" );
            return outcome::success();
        }
        auto encodedBufferResult = EncodeBroadcast( cids );
        if ( encodedBufferResult.has_failure() )
        {
            logger_->error( "Broadcast: Encoding failed, Failed to broadcast" );
            return outcome::failure( encodedBufferResult.error() );
        }

        auto bcastResult = broadcaster_->Broadcast( encodedBufferResult.value(), topic, peerInfo );
        if ( bcastResult.has_failure() )
        {
            logger_->error( "Broadcast: Broadcaster failed to broadcast" );
            return outcome::failure( bcastResult.error() );
        }
        return outcome::success();
    }

    outcome::result<std::shared_ptr<CrdtDatastore::IPLDNode>> CrdtDatastore::CreateIPLDNode(
        const std::vector<std::pair<CID, std::string>> &aHeads,
        const std::shared_ptr<Delta>                   &aDelta,
        const std::unordered_set<std::string>          &topics ) const
    {
        if ( aDelta == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        auto node = ipfs_lite::ipld::IPLDNodeImpl::createFromString( aDelta->SerializeAsString() );
        if ( node == nullptr )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        //Log expensive toString only if trace enabled
        if ( logger_->level() == spdlog::level::trace )
        {
            logger_->trace( "{}: added destination for block {{ cid=\"{}\" }}",
                            __func__,
                            node->getCID().toString().value() );
        }

        for ( auto &topic : topics )
        {
            logger_->info( "Topics {{ name=\"{}\" }}", topic );
            node->addDestination( topic );
        }
        for ( const auto &[head, topic] : aHeads )
        {
            auto cidByte = head.toBytes();
            if ( cidByte.has_failure() )
            {
                continue;
            }
            ipfs_lite::ipld::IPLDLinkImpl link( head, topic, cidByte.value().size() );
            node->addLink( link );
            //Log expensive toString only if trace enabled
            if ( logger_->level() == spdlog::level::trace )
            {
                logger_->trace( "{}: added link {{ cid=\"{}\", name=\"{}\", size={} }}",
                                __func__,
                                link.getCID().toString().value(),
                                link.getName(),
                                link.getSize() );
            }
        }

        return node;
    }

    outcome::result<std::shared_ptr<CrdtDatastore::IPLDNode>> CrdtDatastore::CreateDAGNode(
        const std::shared_ptr<Delta>          &aDelta,
        const std::unordered_set<std::string> &topics )
    {
        OUTCOME_TRY( auto &&head_list, heads_->GetList( topics ) );
        auto [head_map, height] = head_list;

        height = height + 1; // This implies our minimum height is 1
        aDelta->set_priority( height );

        std::vector<std::pair<CID, std::string>> headsWithTopics;

        for ( const auto &[topic_name, cid_set] : head_map )
        {
            for ( const auto &cid : cid_set )
            {
                headsWithTopics.emplace_back( cid, topic_name );
            }
        }

        OUTCOME_TRY( auto &&node, CreateIPLDNode( headsWithTopics, aDelta, topics ) );

        //Log expensive toString only if trace enabled
        if ( logger_->level() == spdlog::level::debug )
        {
            logger_->debug( "{}: Created Node to insert in DAG: {} (instance {})",
                            __func__,
                            node->getCID().toString().value(),
                            reinterpret_cast<uint64_t>( this ) );
        }
        return node;
    }

    outcome::result<CID> CrdtDatastore::AddDAGNode( const std::shared_ptr<CrdtDatastore::IPLDNode> &node )
    {
        RootCIDJob rootJob{ nullptr, node, true };

        {
            MarkJobPending( node->getCID() );
            std::unique_lock lock( dagWorkerMutex_ );
            selfCreatedJobList_.push( rootJob ); // Use high-priority self-created queue
            if ( logger_->level() == spdlog::level::trace )
            {
                logger_->trace(
                    "AddDAGNode: Added SELF-CREATED job for CID {}, self-queue size: {}, external-queue size: {}",
                    node->getCID().toString().value(),
                    selfCreatedJobList_.size(),
                    rootCIDJobList_.size() );
            }
        }

        // Notify all workers to ensure immediate processing
        dagWorkerCv_.notify_all();

        return WaitForJob( node->getCID() );
    }

    outcome::result<CID> CrdtDatastore::WaitForJob( const CID &cid )
    {
        auto cid_string_result = cid.toString();
        logger_->debug( "WaitForJob: Starting to wait for CID {} completion", cid_string_result.value() );

        auto timeout_duration = std::chrono::minutes( 20 );
        auto start_time       = std::chrono::steady_clock::now();
        auto last_log_time    = start_time;

        while ( std::chrono::steady_clock::now() - start_time < timeout_duration )
        {
            {
                std::lock_guard lock( dagWorkerMutex_ );
                auto            it = pending_jobs_.find( cid );
                if ( it != pending_jobs_.end() )
                {
                    if ( it->second == JobStatus::COMPLETED )
                    {
                        pending_jobs_.erase( it );
                        logger_->debug( "WaitForJob: CID {} completed successfully", cid_string_result.value() );
                        return cid;
                    }
                    else if ( it->second == JobStatus::FAILED )
                    {
                        pending_jobs_.erase( it );
                        logger_->error( "WaitForJob: CID {} processing failed", cid_string_result.value() );
                        return outcome::failure( Error::NODE_CREATION );
                    }
                }
                else
                {
                    logger_->error( "WaitForJob: CID {} not found in pending jobs", cid_string_result.value() );
                    return outcome::failure( Error::NODE_CREATION );
                }
            }

            auto current_time = std::chrono::steady_clock::now();

            // Log progress every 30 seconds
            if ( current_time - last_log_time >= std::chrono::seconds( 30 ) )
            {
                auto elapsed_seconds = std::chrono::duration_cast<std::chrono::seconds>( current_time - start_time )
                                           .count();
                logger_->info( "WaitForJob: Still waiting for CID {} (elapsed: {}s)",
                               cid_string_result.value(),
                               elapsed_seconds );
                last_log_time = current_time;
            }

            // Sleep for the minimum of 500ms or remaining time to avoid tight loops near timeout
            auto time_remaining = timeout_duration - ( current_time - start_time );
            auto sleep_duration = std::min( std::chrono::milliseconds( 500 ),
                                            std::chrono::duration_cast<std::chrono::milliseconds>( time_remaining ) );
            if ( sleep_duration.count() > 0 )
            {
                std::this_thread::sleep_for( sleep_duration );
            }
        }

        // Timeout reached
        logger_->error( "WaitForJob: Timeout (20 minutes) waiting for CID {} - size of the rootCIDJobList_: {}",
                        cid_string_result.value(),
                        rootCIDJobList_.size() );
        // Clean up the pending job
        {
            std::lock_guard lock( dagWorkerMutex_ );
            pending_jobs_.erase( cid );
        }
        return outcome::failure( Error::NODE_CREATION );
    }

    void CrdtDatastore::MarkJobPending( const CID &cid )
    {
        std::lock_guard lock( dagWorkerMutex_ );
        auto            it = pending_jobs_.find( cid );
        if ( it == pending_jobs_.end() || it->second != JobStatus::COMPLETED )
        {
            pending_jobs_[cid] = JobStatus::PENDING;
        }
    }

    void CrdtDatastore::MarkJobFailed( const CID &cid )
    {
        std::lock_guard lock( dagWorkerMutex_ );
        pending_jobs_[cid] = JobStatus::FAILED;
    }

    outcome::result<CrdtDatastore::JobStatus> CrdtDatastore::GetJobStatus( const CID &cid )
    {
        auto has_block = dagSyncer_->HasBlock( cid );
        if ( has_block.has_value() && has_block.value() )
        {
            return JobStatus::COMPLETED;
        }

        std::lock_guard lock( dagWorkerMutex_ );
        auto            it = pending_jobs_.find( cid );
        if ( it != pending_jobs_.end() )
        {
            return it->second;
        }
        return outcome::failure( boost::system::error_code{} );
    }

    outcome::result<void> CrdtDatastore::PrintDAG()
    {
        auto getListResult = heads_->GetList();
        if ( getListResult.has_failure() )
        {
            return outcome::failure( getListResult.error() );
        }
        auto [head_map, height] = getListResult.value();

        std::vector<CID> set;
        for ( const auto &[topic_name, cid_set] : head_map )
        {
            for ( const auto &cid : cid_set )
            {
                auto printResult = PrintDAGRec( cid, 0, set );
                if ( printResult.has_failure() )
                {
                    return outcome::failure( printResult.error() );
                }
            }
        }
        return outcome::success();
    }

    outcome::result<void> CrdtDatastore::PrintDAGRec( const CID &aCID, uint64_t aDepth, std::vector<CID> &aSet )
    {
        std::ostringstream line;
        for ( uint64_t i = 0; i < aDepth; ++i )
        {
            line << " ";
        }

        // add a Cid to the set only if it is
        // not in it already.
        if ( std::find( aSet.begin(), aSet.end(), aCID ) != aSet.end() )
        {
            line << "...";
            std::cout << line.str() << std::endl;
            return outcome::success();
        }
        aSet.push_back( aCID );

        auto getNodeResult = dagSyncer_->GetNodeWithoutRequest( aCID );

        if ( getNodeResult.has_failure() )
        {
            return outcome::failure( getNodeResult.error() );
        }
        auto node = getNodeResult.value();

        auto delta      = std::make_shared<Delta>();
        auto nodeBuffer = node->content();
        if ( !delta->ParseFromArray( nodeBuffer.data(), nodeBuffer.size() ) )
        {
            logger_->error( "PrintDAGRec: failed to parse delta from node" );
            return outcome::failure( boost::system::error_code{} );
        }

        std::string strCID = node->getCID().toString().value();
        strCID             = strCID.substr( strCID.size() - 4 );
        line << " - " << delta->priority() << " | " << strCID << ": ";
        line << "Add: {";
        for ( const auto &elem : delta->elements() )
        {
            line << elem.key() << ":" << elem.value() << ",";
        }
        line << "}. Rmv: {";
        for ( const auto &tomb : delta->tombstones() )
        {
            line << tomb.key() << ",";
        }
        line << "}. Links: {";
        for ( const auto &link : node->getLinks() )
        {
            auto strCid = link.get().getCID().toString().value();
            strCid      = strCid.substr( strCid.size() - 4 );
            line << strCid << ",";
        }
        line << "}:";
        std::cout << line.str() << std::endl;

        for ( const auto &link : node->getLinks() )
        {
            PrintDAGRec( link.get().getCID(), aDepth + 1, aSet );
        }

        return outcome::success();
    }

    outcome::result<void> CrdtDatastore::Sync( const HierarchicalKey &aKey )
    {
        // This is a quick write-up of the internals from the time when
        // I was thinking many underlying datastore entries are affected when
        // an add operation happens:
        //
        // When a key is added:
        // - a new delta is made
        // - Delta is marshalled and a DAG-node is created with the bytes,
        //   pointing to previous heads. DAG-node is added to DAGService.
        // - Heads are replaced with new CID.
        // - New CID is broadcasted to everyone
        // - The new CID is processed (up until now the delta had not
        //   taken effect). Implementation detail: it is processed before
        //   broadcast actually.
        // - processNode() starts processing that branch from that CID
        // - it calls set.Merge()
        // - that calls putElems() and putTombs()
        // - that may make a batch for all the elems which is later committed
        // - each element has a datastore entry /setNamespace/elemsNamespace/<key>/<block_id>
        // - each tomb has a datastore entry /setNamespace/tombsNamespace/<key>/<block_id>
        // - each value has a datastore entry /setNamespace/keysNamespace/<key>/valueSuffix
        // - each value has an additional priority entry /setNamespace/keysNamespace/<key>/prioritySuffix
        // - the last two are only written if the added entry has more priority than any the existing
        // - For a value to not be lost, those entries should be fully synced.
        // - In order to check if a value is in the set:
        //   - List all elements on /setNamespace/elemsNamespace/<key> (will return several block_ids)
        //   - If we find an element which is not tombstoned, then value is in the set
        // - In order to retrieve an element's value:
        //   - Check that it is in the set
        //   - Read the value entry from the /setNamespace/keysNamespace/<key>/valueSuffix path

        // Be safe and just sync everything in our namespace
        if ( aKey.GetKey() == "/" )
        {
            return Sync( namespaceKey_ );
        }

        // attempt to be intelligent and sync only all heads and the
        // set entries related to the given prefix.
        std::vector<HierarchicalKey> keysToSync;
        keysToSync.push_back( set_->ElemsPrefix( aKey.GetKey() ) );
        keysToSync.push_back( set_->TombsPrefix( aKey.GetKey() ) );
        keysToSync.push_back( set_->KeysKey( aKey.GetKey() ) ); // covers values and priorities
        keysToSync.push_back( heads_->GetNamespaceKey() );
        return SyncDatastore( keysToSync );
    }

    outcome::result<void> CrdtDatastore::SyncDatastore( const std::vector<HierarchicalKey> &aKeyList )
    {
        // Call the crdt set sync. We don't need to
        // Because a store is shared with SET. Only
        return set_->DataStoreSync( aKeyList );
    }

    outcome::result<std::shared_ptr<CrdtDatastore::Delta>> CrdtDatastore::CreateDeltaToAdd( const std::string &key,
                                                                                            const std::string &value )
    {
        return CrdtSet::CreateDeltaToAdd( key, value );
    }

    outcome::result<std::shared_ptr<CrdtDatastore::Delta>> CrdtDatastore::CreateDeltaToRemove(
        const std::string &key ) const
    {
        return set_->CreateDeltaToRemove( key );
    }

    void CrdtDatastore::PrintDataStore()
    {
        set_->PrintDataStore();
    }

    bool CrdtDatastore::RegisterElementFilter( const std::string &pattern, CRDTElementFilterCallback filter )
    {
        return crdt_filter_.RegisterElementFilter( pattern, std::move( filter ) );
    }

    bool CrdtDatastore::RegisterNewElementCallback( const std::string &pattern, CRDTNewElementCallback callback )
    {
        return crdt_cb_manager_.RegisterNewDataCallback( pattern, std::move( callback ) );
    }

    bool CrdtDatastore::RegisterDeletedElementCallback( const std::string         &pattern,
                                                        CRDTDeletedElementCallback callback )
    {
        return crdt_cb_manager_.RegisterDeletedDataCallback( pattern, std::move( callback ) );
    }

    void CrdtDatastore::PutElementsCallback( const std::string &key, const Buffer &value, const std::string &cid )
    {
        crdt_cb_manager_.PutDataCallback( key, value, cid );
    }

    void CrdtDatastore::DeleteElementsCallback( const std::string &key, const std::string &cid )
    {
        crdt_cb_manager_.DeleteDataCallback( key, cid );
    }

    void CrdtDatastore::UpdateCRDTHeads( const CID &rootCID, uint64_t rootPriority, bool add_topics_to_broadcast )
    {
        std::lock_guard<std::mutex> lock( pendingHeadsMutex_ );
        auto                        it = pendingHeadsByRootCID_.find( rootCID );
        if ( it == pendingHeadsByRootCID_.end() )
        {
            logger_->error( "{}: Error, untracked head {}", __func__, rootCID.toString().value() );
            return;
        }
        std::set<std::string> updated_topics;
        for ( const auto &[cid, topic] : it->second )
        {
            if ( cid == rootCID )
            {
                auto resolve_result = dagSyncer_->markResolved( cid );
                if ( resolve_result.has_failure() )
                {
                    if ( logger_->level() <= spdlog::level::err )
                    {
                        logger_->error( "{}: error marking Root CID {} as resolved", __func__, cid.toString().value() );
                    }
                }
                auto add_result = heads_->Add( rootCID, rootPriority, topic );
                if ( add_result.has_failure() )
                {
                    if ( logger_->level() <= spdlog::level::err )
                    {
                        logger_->error( "{}: error adding head {}", __func__, rootCID.toString().value() );
                    }
                }
                updated_topics.insert( topic );
                if ( logger_->level() <= spdlog::level::debug )
                {
                    logger_->debug( "{}: Marking Head CID {} as resolved", __func__, rootCID.toString().value() );
                }
            }
            else
            {
                auto is_resolved_result = dagSyncer_->isResolved( cid );
                if ( is_resolved_result.has_failure() )
                {
                    if ( logger_->level() <= spdlog::level::err )
                    {
                        logger_->error( "{}: error checking if CID {} IS resolved", __func__, cid.toString().value() );
                    }
                    continue;
                }
                if ( !is_resolved_result.value() )
                {
                    //Log expensive toString only if trace enabled
                    if ( logger_->level() == spdlog::level::trace )
                    {
                        logger_->trace( "{}: Previous Head {} not resolved before replacement with {}",
                                        __func__,
                                        cid.toString().value(),
                                        rootCID.toString().value() );
                    }
                    auto resolve_result = dagSyncer_->markResolved( cid );
                    if ( resolve_result.has_failure() && logger_->level() <= spdlog::level::err )
                    {
                        logger_->error( "{}: error marking old Head CID {} as resolved",
                                        __func__,
                                        cid.toString().value() );
                    }
                }
                auto resolve_result = dagSyncer_->markResolved( rootCID );
                if ( resolve_result.has_failure() && logger_->level() <= spdlog::level::err )
                {
                    logger_->error( "{}: error marking new Head CID {} as resolved", __func__, cid.toString().value() );
                }
                auto replace_result = heads_->Replace( cid, rootCID, rootPriority, topic );
                if ( replace_result.has_failure() && logger_->level() <= spdlog::level::err )
                {
                    logger_->error( "{}: error replacing head {} with {}",
                                    __func__,
                                    cid.toString().value(),
                                    rootCID.toString().value() );
                }
                updated_topics.insert( topic );
            }
        }
        if ( add_topics_to_broadcast )
        {
            std::lock_guard<std::mutex> lock( pendingBroadcastMutex_ );
            pendingBroadcastTopics_.insert( updated_topics.begin(), updated_topics.end() );
        }

        rebroadcastCv_.notify_one();
    }

    bool CrdtDatastore::IsRootCIDPendingOrActive( const CID &cid )
    {
        std::lock_guard lk( dagWorkerMutex_ );
        return IsRootCIDPendingOrActiveLocked( cid );
    }

    bool CrdtDatastore::IsRootCIDPendingOrActiveLocked( const CID &cid ) const
    {
        if ( activeRootCID_.has_value() && activeRootCID_.value() == cid )
        {
            return true;
        }

        auto queue_copy = pendingRootQueue_;
        while ( !queue_copy.empty() )
        {
            if ( queue_copy.front() == cid )
            {
                return true;
            }
            queue_copy.pop();
        }

        return false;
    }

    bool CrdtDatastore::EnqueueRootCID( const CID &cid )
    {
        std::unique_lock lk( dagWorkerMutex_ );
        if ( IsRootCIDPendingOrActiveLocked( cid ) )
        {
            return false;
        }

        pendingRootQueue_.push( cid );
        auto it = pending_jobs_.find( cid );
        if ( it == pending_jobs_.end() || it->second != JobStatus::COMPLETED )
        {
            pending_jobs_[cid] = JobStatus::PENDING;
        }
        return true;
    }

    outcome::result<CrdtHeads::CRDTListResult> CrdtDatastore::GetHeadList()
    {
        return heads_->GetList();
    }

    outcome::result<void> CrdtDatastore::RemoveHead( const CID &aCid, const std::string &topic )
    {
        return heads_->Remove( aCid, topic );
    }

    outcome::result<uint64_t> CrdtDatastore::GetHeadHeight( const CID &aCid, const std::string &topic )
    {
        return heads_->GetHeadHeight( aCid, topic );
    }

    outcome::result<void> CrdtDatastore::AddHead( const CID &aCid, const std::string &topic, uint64_t priority )
    {
        return heads_->Add( aCid, priority, topic );
    }

    void CrdtDatastore::AddTopicName( const std::string &topic )
    {
        if ( topic == "SuperGNUSNode.TestNet.FullNode" )
        {
            has_full_node_topic_ = true;
        }
        std::lock_guard lock( topicNamesMutex_ );
        topicNames_.emplace( topic );
    }

    std::unordered_set<std::string> CrdtDatastore::GetTopicNames() const
    {
        std::lock_guard lock( topicNamesMutex_ );
        return topicNames_;
    }
}

Updated on 2026-03-04 at 13:10:44 -0800