Skip to content

account/Migration3_6_0To3_7_0.cpp

Namespaces

Name
sgns

Source code

#include "Migration3_6_0To3_7_0.hpp"

#include "account/MigrationAllowList.hpp"
#include "account/MigrationManager.hpp"
#include "account/MintTransaction.hpp"
#include "account/TransactionManager.hpp"
#include "account/proto/SGTransaction.pb.h"
#include "base/sgns_version.hpp"
#include "blockchain/Blockchain.hpp"
#include "crypto/hasher/hasher_impl.hpp"
#include "storage/database_error.hpp"

#include <algorithm>
#include <filesystem>
#include <unordered_map>
#include <unordered_set>

namespace sgns
{
    namespace
    {
        constexpr std::string_view kLegacyUTXOPrefix = "/utxo/";

        struct LegacyProducedOutput
        {
            std::string owner_address;
            uint64_t    amount;
        };

        std::optional<std::string> ParseLegacyUTXOOwnerAddress( std::string_view key )
        {
            if ( key.substr( 0, kLegacyUTXOPrefix.size() ) != kLegacyUTXOPrefix )
            {
                return std::nullopt;
            }

            const auto address = key.substr( kLegacyUTXOPrefix.size() );
            if ( address.empty() || address.find( '/' ) != std::string_view::npos )
            {
                return std::nullopt;
            }

            return std::string( address );
        }

        std::string MakeLegacyOutPointKey( std::string_view tx_hash, uint32_t output_idx )
        {
            return std::string( tx_hash ) + ":" + std::to_string( output_idx );
        }

        bool IsMigratableBalanceAddress( std::string_view address )
        {
            return utxo_address::IsAccountPublicKeyAddress( address );
        }
    }

    Migration3_6_0To3_7_0::Migration3_6_0To3_7_0(
        std::shared_ptr<boost::asio::io_context>                        ioContext,
        std::shared_ptr<ipfs_pubsub::GossipPubSub>                      pubSub,
        std::shared_ptr<ipfs_lite::ipfs::graphsync::Network>            graphsync,
        std::shared_ptr<libp2p::basic::Scheduler>                       scheduler,
        std::shared_ptr<ipfs_lite::ipfs::graphsync::RequestIdGenerator> generator,
        std::string                                                     writeBasePath,
        std::string                                                     base58key,
        std::shared_ptr<GeniusAccount>                                  account,
        bool                                                            is_full_node ) :
        ioContext_( std::move( ioContext ) ),
        pubSub_( std::move( pubSub ) ),
        graphsync_( std::move( graphsync ) ),
        scheduler_( std::move( scheduler ) ),
        generator_( std::move( generator ) ),
        writeBasePath_( std::move( writeBasePath ) ),
        base58key_( std::move( base58key ) ),
        account_( std::move( account ) ),
        is_full_node_( is_full_node )
    {
    }

    std::string Migration3_6_0To3_7_0::FromVersion() const
    {
        return "3.6.0";
    }

    std::string Migration3_6_0To3_7_0::ToVersion() const
    {
        return "3.7.0";
    }

    outcome::result<bool> Migration3_6_0To3_7_0::IsRequired() const
    {
        if ( !db_3_6_0_ )
        {
            logger_->info( "Legacy {} DB not found; skipping migration to {}", FromVersion(), ToVersion() );
            return false;
        }

        if ( !db_3_7_0_ )
        {
            logger_->warn( "Target {} DB not initialized yet", ToVersion() );
            return false;
        }

        crdt::GlobalDB::Buffer version_key;
        version_key.put( std::string( MigrationManager::VERSION_INFO_KEY ) );
        auto version_ret = db_3_7_0_->GetDataStore()->get( version_key );

        if ( version_ret.has_error() )
        {
            logger_->info( "No version info found in GlobalDB, migration from {} to {} is required",
                           FromVersion(),
                           ToVersion() );
            return true;
        }

        const auto version_buffer = version_ret.value();
        if ( !IsVersionLessThan( std::string( version_buffer.toString() ), ToVersion() ) )
        {
            logger_->info( "GlobalDB already at target version {}, skipping migration", ToVersion() );
            return false;
        }

        logger_->info( "GlobalDB at version {}, need to migrate to {}", version_buffer.toString(), ToVersion() );
        return true;
    }

    outcome::result<void> Migration3_6_0To3_7_0::Init()
    {
        BOOST_OUTCOME_TRY( auto legacy_db, InitLegacyDb() );
        db_3_6_0_ = std::move( legacy_db );
        if ( db_3_6_0_ )
        {
            BOOST_OUTCOME_TRY( auto new_db, InitTargetDb() );
            db_3_7_0_ = std::move( new_db );
        }
        return outcome::success();
    }

    outcome::result<void> Migration3_6_0To3_7_0::Apply()
    {
        if ( !db_3_6_0_ )
        {
            logger_->error( "Legacy {} DB not initialized; nothing to migrate to {}", FromVersion(), ToVersion() );
            return outcome::success();
        }
        if ( !db_3_7_0_ )
        {
            logger_->error( "Target {} DB not initialized", ToVersion() );
            return outcome::failure( std::errc::no_such_device );
        }

        logger_->info( "Starting migration from {} to {}", FromVersion(), ToVersion() );

        account_->ConfigureDatabaseDependencies( db_3_7_0_ );
        logger_->debug( "{}: Configured account database dependencies for {}", __func__, ToVersion() );

        BOOST_OUTCOME_TRY( Blockchain::MigrateCids( db_3_6_0_, db_3_7_0_ ) );
        logger_->debug( "{}: Migrated blockchain CIDs from {} to {}", __func__, FromVersion(), ToVersion() );
        db_3_7_0_->StartCICSync();
        logger_->debug( "{}: Started CID processing for target {}", __func__, ToVersion() );

        if ( !blockchain_ )
        {
            logger_->debug( "{}: Creating blockchain for target {}", __func__, ToVersion() );
            blockchain_ = Blockchain::New(
                db_3_7_0_,
                account_,
                pubSub_,
                [wptr( weak_from_this() )]( outcome::result<void> result )
                {
                    if ( auto strong = wptr.lock() )
                    {
                        if ( result.has_error() )
                        {
                            strong->logger_->error( "Error starting blockchain: {}", result.error().message() );
                            strong->blockchain_status_.store( Status::ST_ERROR );
                            return;
                        }
                        strong->blockchain_status_.store( Status::ST_SUCCESS );
                    }
                } );
        }
        blockchain_status_.store( Status::ST_INIT, std::memory_order_release );
        logger_->debug( "{}: Starting blockchain bootstrap for {}", __func__, ToVersion() );

        auto                  retry_duration   = std::chrono::minutes( 2 );
        auto                  retry_interval   = std::chrono::seconds( 5 );
        auto                  retry_start_time = std::chrono::steady_clock::now();
        auto                  last_log_time    = retry_start_time;
        outcome::result<void> start_result     = outcome::failure( Blockchain::Error::BLOCKCHAIN_NOT_INITIALIZED );
        do
        {
            start_result = blockchain_->Start();
            if ( start_result.has_error() )
            {
                logger_->error( "Error starting blockchain: {}", start_result.error().message() );
            }

            const auto current_time = std::chrono::steady_clock::now();
            if ( current_time - last_log_time >= std::chrono::seconds( 30 ) )
            {
                const auto elapsed_seconds =
                    std::chrono::duration_cast<std::chrono::seconds>( current_time - retry_start_time ).count();
                logger_->info( "{}: Retrying blockchain start (elapsed: {}s)", __func__, elapsed_seconds );
                last_log_time = current_time;
            }
            std::this_thread::sleep_for( retry_interval );
        } while ( std::chrono::steady_clock::now() - retry_start_time < retry_duration && start_result.has_error() );

        const auto timeout_duration = std::chrono::minutes( 4 );
        auto       start_time       = std::chrono::steady_clock::now();
        last_log_time               = start_time;
        bool blockchain_succeeded   = false;

        while ( std::chrono::steady_clock::now() - start_time < timeout_duration )
        {
            const auto current_time = std::chrono::steady_clock::now();
            if ( blockchain_status_.load( std::memory_order_acquire ) != Status::ST_INIT )
            {
                if ( blockchain_status_.load( std::memory_order_acquire ) == Status::ST_SUCCESS )
                {
                    blockchain_succeeded = true;
                }
                break;
            }
            if ( current_time - last_log_time >= std::chrono::seconds( 30 ) )
            {
                const auto elapsed_seconds =
                    std::chrono::duration_cast<std::chrono::seconds>( current_time - start_time ).count();
                logger_->info( "{}: Still waiting for the blockchain to initialize (elapsed: {}s)",
                               __func__,
                               elapsed_seconds );
                last_log_time = current_time;
            }
            std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
        }
        if ( !blockchain_succeeded )
        {
            return outcome::failure( MigrationManager::Error::BLOCKCHAIN_INIT_FAILED );
        }

        start_time           = std::chrono::steady_clock::now();
        blockchain_succeeded = false;
        while ( std::chrono::steady_clock::now() - start_time < timeout_duration )
        {
            auto genesis_cid_result          = blockchain_->GetGenesisCID();
            auto account_creation_cid_result = blockchain_->GetAccountCreationCID();
            if ( genesis_cid_result.has_value() && account_creation_cid_result.has_value() &&
                 blockchain_->validator_registry_initialized_.load( std::memory_order_acquire ) )
            {
                blockchain_succeeded = true;
                break;
            }

            std::this_thread::sleep_for( std::chrono::milliseconds( 10 ) );
        }
        if ( !blockchain_succeeded )
        {
            logger_->error( "{}: Genesis, Account Creation and/or Validator Registry not initialized", __func__ );
            return outcome::failure( MigrationManager::Error::BLOCKCHAIN_INIT_FAILED );
        }
        logger_->debug( "{}: Blockchain bootstrap complete for {}", __func__, ToVersion() );

        BOOST_OUTCOME_TRY( auto balances, ComputeLegacyBalances() );
        MigrationAllowList allow_list( db_3_7_0_->GetDataStore(), FromVersion() );
        BOOST_OUTCOME_TRY( allow_list.StoreObservedBalances( balances ) );
        logger_->info( "Computed {} legacy address balances for {} -> {} migration",
                       balances.size(),
                       FromVersion(),
                       ToVersion() );

        BOOST_OUTCOME_TRY( auto observed_balance, allow_list.LoadObservedBalance( account_->GetAddress() ) );
        logger_->debug( "{}: Observed balance lookup for {} returned has_value={} balance={}",
                        __func__,
                        account_->GetAddress(),
                        observed_balance.has_value(),
                        observed_balance.has_value() ? observed_balance.value() : 0 );

        if ( observed_balance.has_value() && observed_balance.value() > 0 )
        {
            logger_->debug( "{}: Starting CID receiving for target {}", __func__, ToVersion() );
            db_3_7_0_->StartCIDReceiving();
            logger_->debug( "{}: Starting head rebroadcast for target {}", __func__, ToVersion() );
            db_3_7_0_->StartRebroadcastHeads();

            if ( !transaction_manager_ )
            {
                logger_->debug( "{}: Creating transaction manager for migration flow", __func__ );
                transaction_manager_ = TransactionManager::New( db_3_7_0_,
                                                                ioContext_,
                                                                account_,
                                                                std::make_shared<crypto::HasherImpl>(),
                                                                blockchain_,
                                                                is_full_node_ );
            }

            logger_->debug( "{}: Registering transaction topic names for migration flow", __func__ );
            transaction_manager_->RegisterTopicNames();
            logger_->debug( "{}: Starting transaction manager core for migration flow", __func__ );
            transaction_manager_->StartCore();

            start_time = std::chrono::steady_clock::now();
            while ( std::chrono::steady_clock::now() - start_time < timeout_duration )
            {
                if ( transaction_manager_->GetState() == TransactionManager::State::READY )
                {
                    break;
                }
                std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
            }
            if ( transaction_manager_->GetState() != TransactionManager::State::READY )
            {
                logger_->error( "{}: Transaction Manager did not reach READY", __func__ );
                return outcome::failure( MigrationManager::Error::BLOCKCHAIN_INIT_FAILED );
            }
            logger_->debug( "{}: Transaction Manager is READY for migration flow", __func__ );

            const auto token_id = TokenID::FromBytes( { 0x00 } );
            logger_->info( "{}: Submitting migration transaction for {:.8} amount={}",
                           __func__,
                           account_->GetAddress(),
                           observed_balance.value() );
            BOOST_OUTCOME_TRY( auto tx_hash,
                               transaction_manager_->MigrationFunds( observed_balance.value(),
                                                                     FromVersion(),
                                                                     token_id,
                                                                     account_->GetAddress() ) );
            logger_->debug( "{}: Waiting for migration transaction confirmation tx={}", __func__, tx_hash );

            auto tx_status = transaction_manager_->WaitForTransactionOutgoing( tx_hash, std::chrono::minutes( 4 ) );
            if ( tx_status != TransactionManager::TransactionStatus::CONFIRMED )
            {
                logger_->error( "{}: Migration transaction {} did not confirm. status={}",
                                __func__,
                                tx_hash,
                                static_cast<int>( tx_status ) );
                return outcome::failure( std::errc::timed_out );
            }
        }
        else
        {
            logger_->info( "{}: Local account has no observed legacy balance; skipping migration claim", __func__ );
        }

        crdt::GlobalDB::Buffer version_key;
        version_key.put( std::string( MigrationManager::VERSION_INFO_KEY ) );
        crdt::GlobalDB::Buffer version_value;
        version_value.put( ToVersion() );
        BOOST_OUTCOME_TRY( db_3_7_0_->GetDataStore()->put( version_key, version_value ) );
        logger_->info( "Migration from {} to {} completed successfully", FromVersion(), ToVersion() );

        return outcome::success();
    }

    outcome::result<void> Migration3_6_0To3_7_0::ShutDown()
    {
        if ( transaction_manager_ )
        {
            transaction_manager_->Stop();
            transaction_manager_.reset();
        }
        if ( blockchain_ )
        {
            (void)blockchain_->Stop();
            blockchain_.reset();
        }
        if ( account_ )
        {
            account_->DeconfigureDatabaseDependencies();
            account_->GetUTXOManager().ReleaseStorage();
        }
        blockchain_status_.store( Status::ST_INIT, std::memory_order_release );
        db_3_6_0_.reset();
        db_3_7_0_.reset();

        return outcome::success();
    }

    outcome::result<std::vector<Migration3_6_0To3_7_0::AddressBalance>> Migration3_6_0To3_7_0::ComputeLegacyBalances()
        const
    {
        if ( !db_3_6_0_ )
        {
            logger_->error( "Legacy {} DB not initialized", FromVersion() );
            return std::errc::state_not_recoverable;
        }

        crdt::GlobalDB::Buffer key_buf;
        key_buf.put( std::string( kLegacyUTXOPrefix.substr( 0, kLegacyUTXOPrefix.size() - 1 ) ) );
        auto utxo_list = db_3_6_0_->GetDataStore()->query( key_buf );
        if ( utxo_list.has_error() )
        {
            if ( utxo_list.error() == storage::DatabaseError::NOT_FOUND )
            {
                return std::vector<AddressBalance>{};
            }
            logger_->error( "Failed to query legacy UTXOs: {}", utxo_list.error().message() );
            return utxo_list.error();
        }

        std::vector<AddressBalance> balances;
        balances.reserve( utxo_list.value().size() );

        for ( const auto &[key, value] : utxo_list.value() )
        {
            auto address_opt = ParseLegacyUTXOOwnerAddress( key.toString() );
            if ( !address_opt.has_value() )
            {
                logger_->debug( "Skipping non-legacy UTXO key {}", key.toString() );
                continue;
            }

            SGTransaction::UTXOList utxos;
            if ( !utxos.ParseFromArray( value.data(), value.size() ) )
            {
                logger_->error( "Failed to deserialize legacy UTXOs for address {}", address_opt.value() );
                return std::errc::bad_message;
            }

            uint64_t balance = 0;
            for ( int i = 0; i < utxos.utxos_size(); ++i )
            {
                balance += utxos.utxos( i ).amount();
            }

            if ( IsMigratableBalanceAddress( address_opt.value() ) )
            {
                balances.emplace_back( std::move( address_opt.value() ), balance );
            }
        }

        if ( !balances.empty() )
        {
            std::sort(
                balances.begin(),
                balances.end(),
                []( const AddressBalance &lhs, const AddressBalance &rhs ) { return lhs.first < rhs.first; } );
            return balances;
        }

        logger_->info( "No legacy UTXO snapshots found in {}; reconstructing balances from migrated transactions",
                       FromVersion() );

        std::unordered_map<std::string, LegacyProducedOutput> produced_outputs;
        std::unordered_set<std::string>                       consumed_outpoints;
        size_t                                                scanned_transactions = 0;

        for ( auto network_id : TransactionManager::GetMonitoredNetworkIDs() )
        {
            const std::string query_path = TransactionManager::GetBlockChainBase( network_id ) + "tx";
            BOOST_OUTCOME_TRY( auto transaction_list, db_3_6_0_->QueryKeyValues( query_path ) );

            for ( const auto &[key, value] : transaction_list )
            {
                ++scanned_transactions;

                BOOST_OUTCOME_TRY( auto tx, TransactionManager::DeSerializeTransaction( value ) );
                if ( tx->GetHash().empty() )
                {
                    tx->FillHash();
                }

                const auto tx_hash = tx->GetHash();
                if ( tx_hash.empty() )
                {
                    logger_->error( "Failed to determine hash while reconstructing legacy balance from {}",
                                    key.toString() );
                    return std::errc::bad_message;
                }

                if ( auto params_opt = tx->GetUTXOParametersOpt() )
                {
                    const auto &[inputs, outputs] = params_opt.value();
                    for ( const auto &input : inputs )
                    {
                        consumed_outpoints.emplace(
                            MakeLegacyOutPointKey( input.txid_hash_.toReadableString(), input.output_idx_ ) );
                    }

                    for ( std::uint32_t i = 0; i < outputs.size(); ++i )
                    {
                        produced_outputs[MakeLegacyOutPointKey( tx_hash, i )] =
                            LegacyProducedOutput{ outputs[i].dest_address, outputs[i].encrypted_amount };
                    }

                    continue;
                }

                if ( auto mint_tx = std::dynamic_pointer_cast<MintTransaction>( tx ) )
                {
                    produced_outputs[MakeLegacyOutPointKey( tx_hash, 0 )] =
                        LegacyProducedOutput{ mint_tx->GetSrcAddress(), mint_tx->GetAmount() };
                }
            }
        }

        std::unordered_map<std::string, uint64_t> balance_by_address;
        for ( const auto &[outpoint_key, output] : produced_outputs )
        {
            if ( consumed_outpoints.find( outpoint_key ) != consumed_outpoints.end() ||
                 !IsMigratableBalanceAddress( output.owner_address ) )
            {
                continue;
            }

            balance_by_address[output.owner_address] += output.amount;
        }

        balances.clear();
        balances.reserve( balance_by_address.size() );
        for ( auto &[address, balance] : balance_by_address )
        {
            balances.emplace_back( std::move( address ), balance );
        }

        std::sort(
            balances.begin(),
            balances.end(),
            []( const AddressBalance &lhs, const AddressBalance &rhs ) { return lhs.first < rhs.first; } );

        logger_->info( "Reconstructed {} legacy balances from {} transactions and {} produced outputs",
                       balances.size(),
                       scanned_transactions,
                       produced_outputs.size() );

        return balances;
    }

    outcome::result<std::shared_ptr<crdt::GlobalDB>> Migration3_6_0To3_7_0::InitLegacyDb() const
    {
        static constexpr std::string_view GNUS_NETWORK_PATH_3_6_0 = "SuperGNUSNode.Node";

        auto full_path = writeBasePath_ + std::string( GNUS_NETWORK_PATH_3_6_0 ) +
                         version::GetNetAndVersionAppendix( 3, 6, version::GetNetworkID() ) + base58key_;

        if ( !std::filesystem::exists( full_path ) )
        {
            logger_->info( "Legacy {} DB not found at {}; skipping initialization", FromVersion(), full_path );
            return std::shared_ptr<crdt::GlobalDB>{};
        }

        logger_->debug( "Initializing legacy {} DB at path {}", FromVersion(), full_path );

        auto maybe_db_3_6_0 = crdt::GlobalDB::New( ioContext_,
                                                   full_path,
                                                   pubSub_,
                                                   crdt::CrdtOptions::DefaultOptions(),
                                                   graphsync_,
                                                   scheduler_,
                                                   generator_ );

        if ( !maybe_db_3_6_0.has_value() )
        {
            logger_->error( "Legacy {} DB error at path {}", FromVersion(), full_path );
            return outcome::failure( boost::system::error_code{} );
        }

        logger_->debug( "Started legacy {} DB at path {}", FromVersion(), full_path );
        return std::move( maybe_db_3_6_0.value() );
    }

    outcome::result<std::shared_ptr<crdt::GlobalDB>> Migration3_6_0To3_7_0::InitTargetDb() const
    {
        static constexpr std::string_view GNUS_NETWORK_PATH_3_7_0 = "SuperGNUSNode.Node";

        auto full_path = writeBasePath_ + std::string( GNUS_NETWORK_PATH_3_7_0 ) +
                         version::GetNetAndVersionAppendix( 3, 7, version::GetNetworkID() ) + base58key_;

        logger_->debug( "Initializing target {} DB at path {}", ToVersion(), full_path );

        auto maybe_db_3_7_0 = crdt::GlobalDB::New( ioContext_,
                                                   full_path,
                                                   pubSub_,
                                                   crdt::CrdtOptions::DefaultOptions(),
                                                   graphsync_,
                                                   scheduler_,
                                                   generator_ );

        if ( !maybe_db_3_7_0.has_value() )
        {
            logger_->error( "Target {} DB error at path {}", ToVersion(), full_path );
            return outcome::failure( boost::system::error_code{} );
        }

        logger_->debug( "Started target {} DB at path {}", ToVersion(), full_path );
        return std::move( maybe_db_3_7_0.value() );
    }
}

Updated on 2026-06-05 at 17:22:19 -0700