Skip to content

src/account/Migration3_4_0To3_5_0.cpp

More...

Namespaces

Name
sgns

Detailed Description

Date: 2025-11-14 Henrique A. Klein ([email protected])

Source code

#include "Migration3_4_0To3_5_0.hpp"

#include "account/EscrowReleaseTransaction.hpp"
#include "account/GeniusAccount.hpp"
#include "account/MintTransaction.hpp"
#include "account/TransactionManager.hpp"
#include "account/MigrationManager.hpp"
#include "account/TransferTransaction.hpp"

namespace sgns
{
    namespace
    {
        std::string BuildLegacyTransactionPath_3_5_0( const IGeniusTransactions &tx )
        {
            return tx.GetSrcAddress() + "/tx/" + tx.GetTransactionSpecificPath() + "/" +
                   std::to_string( tx.dag_st.nonce() );
        }
    }

    Migration3_4_0To3_5_0::Migration3_4_0To3_5_0(
        std::shared_ptr<boost::asio::io_context>                        ioContext,
        std::shared_ptr<ipfs_pubsub::GossipPubSub>                      pubSub,
        std::shared_ptr<ipfs_lite::ipfs::graphsync::Network>            graphsync,
        std::shared_ptr<libp2p::protocol::Scheduler>                    scheduler,
        std::shared_ptr<ipfs_lite::ipfs::graphsync::RequestIdGenerator> generator,
        std::string                                                     writeBasePath,
        std::string                                                     base58key,
        std::shared_ptr<GeniusAccount>                                  account ) :
        ioContext_( std::move( ioContext ) ),
        pubSub_( std::move( pubSub ) ),
        graphsync_( std::move( graphsync ) ),
        scheduler_( std::move( scheduler ) ),
        generator_( std::move( generator ) ),
        writeBasePath_( std::move( writeBasePath ) ),
        base58key_( std::move( base58key ) ),
        account_( std::move( account ) )
    {
    }

    Migration3_4_0To3_5_0::~Migration3_4_0To3_5_0() {}

    std::string Migration3_4_0To3_5_0::FromVersion() const
    {
        return "3.4.0";
    }

    std::string Migration3_4_0To3_5_0::ToVersion() const
    {
        return "3.5.0";
    }

    outcome::result<bool> Migration3_4_0To3_5_0::IsRequired() const
    {
        bool ret = false;

        if ( !db_3_4_0_ )
        {
            logger_->info( "Legacy 3.4.0 DB not found; skipping migration to {}", ToVersion() );
            return ret;
        }

        do
        {
            sgns::crdt::GlobalDB::Buffer version_key;
            version_key.put( std::string( MigrationManager::VERSION_INFO_KEY ) );
            auto version_ret = db_3_5_0_->GetDataStore()->get( version_key );

            if ( version_ret.has_error() )
            {
                // No version info found, migration is required
                logger_->info( "No version info found in GlobalDB, migration from {} to {} is required",
                               FromVersion(),
                               ToVersion() );
                ret = true;
                break;
            }
            auto version_buffer = version_ret.value();

            if ( !IsVersionLessThan( std::string( version_buffer.toString() ), ToVersion() ) )
            {
                logger_->info( "GlobalDB already at target version {}, skipping migration", ToVersion() );
                break;
            }
            logger_->info( "GlobalDB at version {}, need to migrate", FromVersion(), ToVersion() );
            ret = true;

        } while ( 0 );

        return ret;
    }

    outcome::result<void> Migration3_4_0To3_5_0::Init()
    {
        OUTCOME_TRY( auto &&legacy_db, InitLegacyDb() );
        db_3_4_0_ = std::move( legacy_db );
        if ( db_3_4_0_ )
        {
            OUTCOME_TRY( auto &&new_db, InitTargetDb() );
            db_3_5_0_ = std::move( new_db );
        }
        return outcome::success();
    }

    outcome::result<void> Migration3_4_0To3_5_0::Apply()
    {
        if ( !db_3_4_0_ )
        {
            logger_->error( "Legacy 3.4.0 DB not initialized; nothing to migrate to {}", ToVersion() );
            return outcome::success();
        }

        logger_->info( "Starting migration from {} to {}", FromVersion(), ToVersion() );

        account_->ConfigureMessengerHandlers( db_3_5_0_ );

        db_3_5_0_->Start();
        //init blockchain
        if ( !blockchain_ )
        {
            blockchain_ = Blockchain::New(
                db_3_5_0_,
                account_,
                [wptr( weak_from_this() )]( outcome::result<void> result )
                {
                    if ( auto strong = wptr.lock() )
                    {
                        if ( result.has_error() )
                        {
                            strong->logger_->error( "Error starting blockchain: {}", result.error().message() );
                            strong->blockchain_status_.store( Status::ST_ERROR );
                            return;
                        }
                        strong->logger_->debug( "Blockchain started successfully, starting transaction manager" );
                        strong->blockchain_status_.store( Status::ST_SUCCESS );
                    }
                } );
        }

        auto                  retry_duration   = std::chrono::minutes( 2 );
        auto                  retry_interval   = std::chrono::seconds( 5 );
        auto                  retry_start_time = std::chrono::steady_clock::now();
        auto                  last_log_time    = retry_start_time;
        outcome::result<void> start_result     = outcome::failure( Blockchain::Error::BLOCKCHAIN_NOT_INITIALIZED );
        do
        {
            start_result = blockchain_->Start();
            if ( start_result.has_error() )
            {
                logger_->error( "Error starting blockchain: {}", start_result.error().message() );
            }

            auto current_time = std::chrono::steady_clock::now();
            if ( current_time - last_log_time >= std::chrono::seconds( 30 ) )
            {
                auto elapsed_seconds =
                    std::chrono::duration_cast<std::chrono::seconds>( current_time - retry_start_time ).count();
                logger_->info( "{}: Retrying blockchain start (elapsed: {}s)", __func__, elapsed_seconds );
                last_log_time = current_time;
            }
            std::this_thread::sleep_for( retry_interval );
        } while ( std::chrono::steady_clock::now() - retry_start_time < retry_duration && start_result.has_error() );

        auto timeout_duration     = std::chrono::minutes( 4 );
        auto start_time           = std::chrono::steady_clock::now();
        last_log_time             = start_time;
        bool blockchain_succeeded = false;

        while ( std::chrono::steady_clock::now() - start_time < timeout_duration )
        {
            auto current_time = std::chrono::steady_clock::now();
            if ( blockchain_status_.load( std::memory_order_acquire ) != Status::ST_INIT )
            {
                // spin or sleep
                if ( blockchain_status_.load( std::memory_order_acquire ) == Status::ST_SUCCESS )
                {
                    auto elapsed_seconds = std::chrono::duration_cast<std::chrono::seconds>( current_time - start_time )
                                               .count();
                    blockchain_succeeded = true;
                    logger_->debug( "{}: Blockchain succeeded (elapsed: {}s)", __func__, elapsed_seconds );
                }
                break;
            }
            if ( current_time - last_log_time >= std::chrono::seconds( 30 ) )
            {
                auto elapsed_seconds = std::chrono::duration_cast<std::chrono::seconds>( current_time - start_time )
                                           .count();
                logger_->info( "{}: Still waiting for the blockchain to initialize (elapsed: {}s)",
                               __func__,
                               elapsed_seconds );
                last_log_time = current_time;
            }
            std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
        }
        if ( !blockchain_succeeded )
        {
            auto elapsed_seconds = std::chrono::duration_cast<std::chrono::seconds>( std::chrono::steady_clock::now() -
                                                                                     start_time )
                                       .count();
            logger_->error( "{}: Blockchain errored out (elapsed {}s)", __func__, elapsed_seconds );

            return outcome::failure( MigrationManager::Error::BLOCKCHAIN_INIT_FAILED );
        }

        start_time           = std::chrono::steady_clock::now();
        blockchain_succeeded = false;
        while ( std::chrono::steady_clock::now() - start_time < timeout_duration )
        {
            auto genesis_cid_result          = blockchain_->GetGenesisCID();
            auto account_creation_cid_result = blockchain_->GetAccountCreationCID();
            if ( genesis_cid_result.has_value() && account_creation_cid_result.has_value() )
            {
                blockchain_succeeded = true;
                break;
            }

            std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
        }
        if ( !blockchain_succeeded )
        {
            logger_->error( "{}: Genesis and/or Account Creation CID not fetched", __func__ );

            return outcome::failure( MigrationManager::Error::BLOCKCHAIN_INIT_FAILED );
        }

        auto                  crdt_transaction_ = db_3_5_0_->BeginTransaction();
        std::unordered_set<std::string> topics_;

        topics_.emplace( std::string( TransactionManager::GNUS_FULL_NODES_TOPIC ) );

        std::vector<uint16_t> monitored_networks{ version::DEV_NET_ID, version::TEST_NET_ID, version::MAIN_NET_ID };
        size_t                migrated_count = 0;
        size_t                BATCH_SIZE     = 50;

        struct TransactionRecord
        {
            std::shared_ptr<IGeniusTransactions> tx;
            std::string                          key;
        };

        for ( const auto network_id : monitored_networks )
        {
            auto blockchain_base = TransactionManager::GetBlockChainBase( network_id );
            OUTCOME_TRY( auto &&entries, db_3_4_0_->QueryKeyValues( blockchain_base, "*", "/tx" ) );
            logger_->debug( "Found {} transaction keys to migrate on network {}", entries.size(), network_id );

            std::vector<TransactionRecord> owned_transactions;
            owned_transactions.reserve( entries.size() );
            std::vector<TransactionRecord> other_transactions;
            other_transactions.reserve( entries.size() );

            auto persist_record = [&]( const TransactionRecord &record ) -> outcome::result<void>
            {
                sgns::crdt::GlobalDB::Buffer data_transaction;
                data_transaction.put( record.tx->SerializeByteVector() );
                BOOST_OUTCOME_TRYV2( auto &&, crdt_transaction_->Put( record.key, std::move( data_transaction ) ) );

                topics_.emplace( record.tx->GetSrcAddress() );
                if ( auto transfer_tx = std::dynamic_pointer_cast<TransferTransaction>( record.tx ) )
                {
                    for ( const auto &dest_info : transfer_tx->GetDstInfos() )
                    {
                        topics_.emplace( dest_info.dest_address );
                    }
                }
                if ( auto escrow_tx = std::dynamic_pointer_cast<EscrowReleaseTransaction>( record.tx ) )
                {
                    topics_.emplace( escrow_tx->GetSrcAddress() );
                    topics_.emplace( escrow_tx->GetEscrowSource() );
                }

                ++migrated_count;
                if ( migrated_count >= BATCH_SIZE )
                {
                    OUTCOME_TRY( crdt_transaction_->Commit( topics_ ) );
                    crdt_transaction_ = db_3_5_0_->BeginTransaction();
                    topics_.clear();
                    topics_.emplace( std::string( TransactionManager::GNUS_FULL_NODES_TOPIC ) );
                    migrated_count = 0;
                    logger_->debug( "Committed a batch of {} transactions", BATCH_SIZE );
                }
                return outcome::success();
            };

            for ( const auto &entry : entries )
            {
                auto keyOpt = db_3_4_0_->KeyToString( entry.first );
                if ( !keyOpt.has_value() )
                {
                    logger_->error( "Failed to convert key buffer to string" );
                    continue;
                }
                std::string transaction_key   = keyOpt.value();
                auto        maybe_transaction = TransactionManager::FetchTransaction( db_3_4_0_, transaction_key );
                if ( !maybe_transaction.has_value() )
                {
                    logger_->error( "Can't fetch transaction for key {}", transaction_key );
                    continue;
                }
                auto tx = maybe_transaction.value();
                logger_->trace( "Fetched transaction {}", transaction_key );

                if ( !tx->CheckSignature() )
                {
                    if ( !tx->CheckDAGSignatureLegacy() )
                    {
                        logger_->error( "Could not validate signature of transaction {}", transaction_key );
                        continue;
                    }
                }

                bool              is_owned = ( tx->GetSrcAddress() == account_->GetAddress() );
                TransactionRecord record{ tx, std::move( transaction_key ) };

                if ( is_owned )
                {
                    owned_transactions.push_back( std::move( record ) );
                }
                else
                {
                    other_transactions.push_back( std::move( record ) );
                }
            }

            std::sort( owned_transactions.begin(),
                       owned_transactions.end(),
                       []( const TransactionRecord &lhs, const TransactionRecord &rhs )
                       {
                           if ( lhs.tx->dag_st.nonce() == rhs.tx->dag_st.nonce() )
                           {
                               return lhs.key < rhs.key;
                           }
                           return lhs.tx->dag_st.nonce() < rhs.tx->dag_st.nonce();
                       } );

            if ( !owned_transactions.empty() )
            {
                uint64_t expected_nonce = 0;
                uint64_t last_timestamp = 0;

                auto create_zero_value_mint = [&]( uint64_t nonce ) -> TransactionRecord
                {
                    SGTransaction::DAGStruct dag;
                    dag.set_previous_hash( "" );
                    dag.set_nonce( nonce );
                    dag.set_source_addr( account_->GetAddress() );
                    auto current_timestamp = static_cast<uint64_t>(
                        std::chrono::duration_cast<std::chrono::milliseconds>(
                            std::chrono::system_clock::now().time_since_epoch() )
                            .count() );
                    if ( last_timestamp != 0 && current_timestamp <= last_timestamp )
                    {
                        current_timestamp = last_timestamp + 1;
                    }
                    dag.set_timestamp( current_timestamp );
                    dag.set_uncle_hash( "" );
                    dag.set_data_hash( "" );

                    TokenID token_id;
                    auto    mint_tx = std::make_shared<MintTransaction>(
                        MintTransaction::New( 0, std::to_string( network_id ), token_id, std::move( dag ) ) );
                    mint_tx->MakeSignature( *account_ );

                    logger_->info( "Synthesizing zero-value mint for missing nonce {} on network {}",
                                   nonce,
                                   network_id );
                    auto tx_path = blockchain_base + BuildLegacyTransactionPath_3_5_0( *mint_tx );
                    return TransactionRecord{ std::move( mint_tx ), std::move( tx_path ) };
                };

                for ( auto &record : owned_transactions )
                {
                    while ( expected_nonce < record.tx->dag_st.nonce() )
                    {
                        auto filler_record = create_zero_value_mint( expected_nonce );
                        logger_->info( "Synthesized zero-value mint for missing nonce {} on network {}",
                                       expected_nonce,
                                       network_id );
                        OUTCOME_TRY( persist_record( filler_record ) );
                        last_timestamp = filler_record.tx->GetTimestamp();
                        ++expected_nonce;
                    }

                    OUTCOME_TRY( persist_record( record ) );
                    last_timestamp = record.tx->GetTimestamp();
                    expected_nonce = record.tx->dag_st.nonce() + 1;
                }
            }

            for ( const auto &record : other_transactions )
            {
                OUTCOME_TRY( persist_record( record ) );
            }
        }
        if ( migrated_count != 0 )
        {
            OUTCOME_TRY( crdt_transaction_->Commit( topics_ ) );
            logger_->debug( "Committed remaining {}  transactions", migrated_count );
        }

        sgns::crdt::GlobalDB::Buffer version_buffer;
        sgns::crdt::GlobalDB::Buffer version_key;
        version_key.put( std::string( MigrationManager::VERSION_INFO_KEY ) );
        version_buffer.put( ToVersion() );

        OUTCOME_TRY( db_3_5_0_->GetDataStore()->put( version_key, version_buffer ) );
        logger_->debug( "Migration from {} to {} completed successfully", FromVersion(), ToVersion() );

        return outcome::success();
    }

    outcome::result<std::shared_ptr<crdt::GlobalDB>> Migration3_4_0To3_5_0::InitLegacyDb()
    {
        static constexpr std::string_view GNUS_NETWORK_PATH_3_4_0 = "SuperGNUSNode.Node";

        auto full_path = writeBasePath_ + std::string( GNUS_NETWORK_PATH_3_4_0 ) +
                         version::GetNetAndVersionAppendix( 3, 4, version::GetNetworkID() ) + base58key_;

        if ( !std::filesystem::exists( full_path ) )
        {
            logger_->info( "Legacy 3.4.0 DB not found at {}; skipping initialization", full_path );
            return std::shared_ptr<crdt::GlobalDB>{};
        }

        logger_->debug( "Initializing legacy {} DB at path {}", FromVersion(), full_path );

        auto maybe_db_3_4_0 = crdt::GlobalDB::New( ioContext_,
                                                   full_path,
                                                   pubSub_,
                                                   crdt::CrdtOptions::DefaultOptions(),
                                                   graphsync_,
                                                   scheduler_,
                                                   generator_ );

        if ( !maybe_db_3_4_0.has_value() )
        {
            logger_->error( "Legacy {} DB error at path {}", FromVersion(), full_path );
            return outcome::failure( boost::system::error_code{} );
        }

        logger_->debug( "Started legacy {} DB at path {}", FromVersion(), full_path );
        return std::move( maybe_db_3_4_0.value() );
    }

    outcome::result<std::shared_ptr<crdt::GlobalDB>> Migration3_4_0To3_5_0::InitTargetDb()
    {
        static constexpr std::string_view GNUS_NETWORK_PATH_3_5_0 = "SuperGNUSNode.Node";

        auto full_path = writeBasePath_ + std::string( GNUS_NETWORK_PATH_3_5_0 ) +
                         version::GetNetAndVersionAppendix( 3, 5, version::GetNetworkID() ) + base58key_;

        logger_->debug( "Initializing target {} DB at path {}", ToVersion(), full_path );

        auto maybe_db_3_5_0 = crdt::GlobalDB::New( ioContext_,
                                                   full_path,
                                                   pubSub_,
                                                   crdt::CrdtOptions::DefaultOptions(),
                                                   graphsync_,
                                                   scheduler_,
                                                   generator_ );

        if ( !maybe_db_3_5_0.has_value() )
        {
            logger_->error( "Target {} DB error at path {}", ToVersion(), full_path );
            return outcome::failure( boost::system::error_code{} );
        }

        logger_->debug( "Started target {} DB at path {}", ToVersion(), full_path );
        return std::move( maybe_db_3_5_0.value() );
    }

    outcome::result<void> Migration3_4_0To3_5_0::ShutDown()
    {
        db_3_4_0_.reset();
        db_3_5_0_.reset();
        blockchain_.reset();
        return outcome::success();
    }
}

Updated on 2026-03-04 at 13:10:44 -0800