Skip to content

src/account/GeniusNode.cpp

More...

Namespaces

Name
sgns

Functions

Name
OUTCOME_CPP_DEFINE_CATEGORY_3(sgns , GeniusNode::Error , e )
base::Logger GeniusNodeLogger()
std::string generate_uuid_with_ipfs_id(const std::string & ipfs_id)

Detailed Description

Date: 2024-04-18 Henrique A. Klein ([email protected])

Functions Documentation

function OUTCOME_CPP_DEFINE_CATEGORY_3

OUTCOME_CPP_DEFINE_CATEGORY_3(
    sgns ,
    GeniusNode::Error ,
    e 
)

function GeniusNodeLogger

base::Logger GeniusNodeLogger()

function generate_uuid_with_ipfs_id

std::string generate_uuid_with_ipfs_id(
    const std::string & ipfs_id
)

Source code

#include <mutex>
#include <stdexcept>
#include <thread>
#include <memory>
#include <exception>

#include <boost/format.hpp>
#include <boost/multiprecision/cpp_int.hpp>
#include <boost/uuid/uuid.hpp>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <ipfs_lite/ipfs/graphsync/impl/network/network.hpp>
#include <ipfs_lite/ipfs/graphsync/impl/local_requests.hpp>
#include <libp2p/protocol/common/asio/asio_scheduler.hpp>

#include "base/sgns_version.hpp"
#include "account/TokenAmount.hpp"
#include "account/GeniusNode.hpp"
#include "account/MigrationManager.hpp"
#include "crdt/globaldb/keypair_file_storage.hpp"
#include "upnp.hpp"
#include "processing/processing_tasksplit.hpp"
#include "processing/processing_subtask_enqueuer_impl.hpp"
#include "local_secure_storage/SecureStorage.hpp"
#include <Generators.hpp>

namespace
{
    uint16_t GenerateRandomPort( uint16_t base, const std::string &seed )
    {
        uint32_t seed_hash = 0;
        for ( char c : seed )
        {
            seed_hash = seed_hash * 31 + static_cast<uint8_t>( c );
        }

        std::mt19937                            rng( seed_hash );
        std::uniform_int_distribution<uint16_t> dist( 0, 300 );

        return base + dist( rng );
    }

    const char *NodeStateToString( sgns::GeniusNode::NodeState state )
    {
        using State = sgns::GeniusNode::NodeState;
        switch ( state )
        {
            case State::CREATING:
                return "CREATING";
            case State::MIGRATING_DATABASE:
                return "MIGRATING_DATABASE";
            case State::INITIALIZING_DATABASE:
                return "INITIALIZING_DATABASE";
            case State::INITIALIZING_PROCESSING:
                return "INITIALIZING_PROCESSING";
            case State::INITIALIZING_BLOCKCHAIN:
                return "INITIALIZING_BLOCKCHAIN";
            case State::INITIALIZING_TRANSACTIONS:
                return "INITIALIZING_TRANSACTIONS";
            case State::INITIALIZING_DHT:
                return "INITIALIZING_DHT";
            case State::READY:
                return "READY";
        }
        return "UNKNOWN";
    }
}

OUTCOME_CPP_DEFINE_CATEGORY_3( sgns, GeniusNode::Error, e )
{
    switch ( e )
    {
        case sgns::GeniusNode::Error::INSUFFICIENT_FUNDS:
            return "Insufficient funds for the transaction";
        case sgns::GeniusNode::Error::DATABASE_WRITE_ERROR:
            return "Error writing data into the database";
        case sgns::GeniusNode::Error::INVALID_TRANSACTION_HASH:
            return "Input transaction hash is invalid";
        case sgns::GeniusNode::Error::INVALID_CHAIN_ID:
            return "Chain ID is invalid";
        case sgns::GeniusNode::Error::INVALID_TOKEN_ID:
            return "Token ID is invalid";
        case sgns::GeniusNode::Error::TOKEN_ID_MISMATCH:
            return "Informed Token ID doesn't match initialized ID";
        case sgns::GeniusNode::Error::PROCESS_COST_ERROR:
            return "The calculated Processing cost was negative";
        case sgns::GeniusNode::Error::PROCESS_INFO_MISSING:
            return "Processing information missing on JSON file";
        case sgns::GeniusNode::Error::INVALID_JSON:
            return "Json cannot be parsed";
        case sgns::GeniusNode::Error::INVALID_BLOCK_PARAMETERS:
            return "Json missing block params";
        case sgns::GeniusNode::Error::NO_PROCESSOR:
            return "Json missing processor";
        case sgns::GeniusNode::Error::NO_PRICE:
            return "Could not get a price";
        case sgns::GeniusNode::Error::TRANSACTIONS_NOT_READY:
            return "Transaction manager is not ready";
    }
    return "Unknown error";
}

using namespace boost::multiprecision;

namespace sgns
{
    base::Logger GeniusNodeLogger()
    {
        return base::createLogger( "GeniusNode" );
    }

    std::shared_ptr<GeniusNode> GeniusNode::New( const DevConfig_st &dev_config,
                                                 bool                autodht,
                                                 bool                isprocessor,
                                                 uint16_t            base_port,
                                                 bool                is_full_node,
                                                 bool                use_upnp )
    {
        auto instance = std::shared_ptr<GeniusNode>(
            new GeniusNode( dev_config,
                            GeniusAccount::New( dev_config.TokenID, dev_config.BaseWritePath, is_full_node ),
                            autodht,
                            isprocessor,
                            base_port,
                            is_full_node,
                            use_upnp ) );

        if ( instance )
        {
            instance->BeginDBInitialization();
        }

        return instance;
    }

    std::shared_ptr<GeniusNode> GeniusNode::New( const DevConfig_st &dev_config,
                                                 const char         *eth_private_key,
                                                 bool                autodht,
                                                 bool                isprocessor,
                                                 uint16_t            base_port,
                                                 bool                is_full_node,
                                                 bool                use_upnp )
    {
        auto instance = std::shared_ptr<GeniusNode>( new GeniusNode(
            dev_config,
            GeniusAccount::New( dev_config.TokenID, eth_private_key, dev_config.BaseWritePath, is_full_node ),
            autodht,
            isprocessor,
            base_port,
            is_full_node,
            use_upnp ) );

        if ( instance )
        {
            instance->BeginDBInitialization();
        }

        return instance;
    }

    std::shared_ptr<GeniusNode> GeniusNode::New( const DevConfig_st               &dev_config,
                                                 const GeniusAccount::Credentials &credentials,
                                                 bool                              autodht,
                                                 bool                              isprocessor,
                                                 uint16_t                          base_port,
                                                 bool                              is_full_node,
                                                 bool                              use_upnp )
    {
        auto instance = std::shared_ptr<GeniusNode>( new GeniusNode(
            dev_config,
            GeniusAccount::New( dev_config.TokenID, credentials, dev_config.BaseWritePath, is_full_node ),
            autodht,
            isprocessor,
            base_port,
            is_full_node,
            use_upnp ) );

        if ( instance )
        {
            instance->BeginDBInitialization();
        }

        return instance;
    }

    GeniusNode::GeniusNode( const DevConfig_st            &dev_config,
                            std::shared_ptr<GeniusAccount> account,
                            bool                           autodht,
                            bool                           isprocessor,
                            uint16_t                       base_port,
                            bool                           is_full_node,
                            bool                           use_upnp ) :
        write_base_path_( dev_config.BaseWritePath ),
        account_( std::move( account ) ),
        utxo_manager_(
            is_full_node,
            account_->GetAddress(),
            [this]( const std::vector<uint8_t> data ) { return this->account_->Sign( data ); },
            []( const std::string &address, const std::vector<uint8_t> &signature, const std::vector<uint8_t> &data )
            {
                return GeniusAccount::VerifySignature( address,
                                                       std::string( signature.begin(), signature.end() ),
                                                       data );
            } ),
        io_( std::make_shared<boost::asio::io_context>() ),
        autodht_( autodht ),
        isprocessor_( isprocessor ),
        is_full_node_( is_full_node ),
        dev_config_( dev_config ),
        processing_channel_topic_( std::string( PROCESSING_CHANNEL ) ),
        processing_grid_chanel_topic_( std::string( PROCESSING_GRID_CHANNEL ) ),
        m_lastApiCall( std::chrono::system_clock::now() - m_minApiCallInterval ),
        scheduler_( std::make_shared<libp2p::protocol::AsioScheduler>( io_, libp2p::protocol::SchedulerConfig{} ) ),
        generator_( std::make_shared<ipfs_lite::ipfs::graphsync::RequestIdGenerator>() ),
        processing_callback_pool_( std::make_unique<boost::asio::thread_pool>( 1 ) ),
        use_upnp_( use_upnp )
    {
        // Rotate log files before initializing logging system
        RotateLogFiles( write_base_path_ );
        InitOpenSSL();

        if ( !InitLoggers( write_base_path_ ) )
        {
            throw std::runtime_error( "Could not configure loggers" );
        }

        node_logger_->info( sgns::version::SuperGeniusVersionText() );

        if ( !InitNetwork( base_port, is_full_node_ ) )
        {
            throw std::runtime_error( "Network initialization error" );
        }
        node_logger_->debug( "Account Address {}", account_->GetAddress() );
    }

    void GeniusNode::BeginDBInitialization()
    {
        StateTransition( NodeState::MIGRATING_DATABASE );
    }

    void GeniusNode::StateTransition( NodeState next_state )
    {
        state_.store( next_state );
        node_logger_->debug( "Transitioning to state {}", NodeStateToString( next_state ) );

        switch ( next_state )
        {
            case NodeState::MIGRATING_DATABASE:
            {
                account_->InitMessenger( pubsub_ );
                MigrateDatabase(
                    [weak_self( weak_from_this() )]( outcome::result<void> result )
                    {
                        if ( auto strong = weak_self.lock() )
                        {
                            if ( result.has_error() )
                            {
                                strong->node_logger_->error( "Database migration error: {}", result.error().message() );
                                if ( result.error() == MigrationManager::Error::BLOCKCHAIN_INIT_FAILED )
                                {
                                    strong->node_logger_->info( "Scheduling blockchain retry after failure" );
                                    strong->ScheduleMigrationRetry();
                                }
                                return;
                            }
                            strong->StateTransition( NodeState::INITIALIZING_DATABASE );
                        }
                    } );
                break;
            }
            case NodeState::INITIALIZING_DATABASE:
            {
                if ( !InitDatabase() )
                {
                    node_logger_->error( "GlobalDB initialization error" );
                    return;
                }
                account_->ConfigureMessengerHandlers( tx_globaldb_ );
                tx_globaldb_->AddListenTopic( processing_channel_topic_ );
                StateTransition( NodeState::INITIALIZING_PROCESSING );
                break;
            }
            case NodeState::INITIALIZING_PROCESSING:
            {
                if ( !InitProcessingModules() )
                {
                    node_logger_->error( "Processing modules initialization error" );
                    return;
                }

                processing_service_ = std::make_shared<processing::ProcessingServiceImpl>(
                    pubsub_,
                    MAX_NODES_COUNT,
                    std::make_shared<processing::SubTaskEnqueuerImpl>( task_queue_ ),
                    task_result_storage_,
                    processing_core_,
                    [weak_self = weak_from_this()]( const std::string &var, const SGProcessing::TaskResult &taskresult )
                    {
                        if ( auto strong = weak_self.lock() )
                        {
                            strong->ProcessingDone( var, taskresult );
                        }
                    },
                    [weak_self = weak_from_this()]( const std::string &var )
                    {
                        if ( auto strong = weak_self.lock() )
                        {
                            strong->ProcessingError( var );
                        }
                    },
                    account_->GetAddress() );

                processing_service_->SetChannelListRequestTimeout( boost::posix_time::milliseconds( 3000 ) );
                StateTransition( NodeState::INITIALIZING_DHT );
                break;
            }
            case NodeState::INITIALIZING_DHT:
            {
                if ( use_upnp_ )
                {
                    RefreshUPNP( pubsubport_ );
                }
                io_work_guard_.emplace( io_->get_executor() );
                unsigned desired_threads = io_thread_count_;
                if ( desired_threads == 0 )
                {
                    desired_threads = GeniusNode::DEFAULT_IO_THREADS;
                }
                io_threads_.reserve( desired_threads );
                for ( unsigned i = 0; i < desired_threads; ++i )
                {
                    io_threads_.emplace_back( [ctx = io_]() { ctx->run(); } );
                }
                StateTransition( NodeState::INITIALIZING_BLOCKCHAIN );
                break;
            }

            case NodeState::INITIALIZING_BLOCKCHAIN:
            {
                if ( !blockchain_ )
                {
                    auto weak_self = weak_from_this();
                    blockchain_    = Blockchain::New(
                        tx_globaldb_,
                        account_,
                        [weak_self]( outcome::result<void> result )
                        {
                            if ( auto strong = weak_self.lock() )
                            {
                                if ( result.has_error() )
                                {
                                    strong->node_logger_->error( "Error starting blockchain: {}",
                                                                 result.error().message() );
                                    strong->node_logger_->info( "Scheduling blockchain retry after failure" );
                                    strong->ScheduleBlockchainRetry();
                                    return;
                                }
                                auto current_state = strong->state_.load();
                                if ( current_state != NodeState::INITIALIZING_BLOCKCHAIN )
                                {
                                    strong->node_logger_->debug(
                                        "Skipping transaction initialization, unexpected state: {}",
                                        NodeStateToString( current_state ) );
                                    return;
                                }
                                strong->node_logger_->debug(
                                    "Blockchain started successfully, starting transaction manager" );
                                if ( strong->is_full_node_ )
                                {
                                    strong->node_logger_->debug(
                                        "Full node: Setting blockchain to grab other account creation blocks" );
                                    strong->blockchain_->SetFullNodeMode();
                                }

                                strong->StateTransition( NodeState::INITIALIZING_TRANSACTIONS );
                            }
                        } );
                }
                blockchain_->Start();

                break;
            }

            case NodeState::INITIALIZING_TRANSACTIONS:
            {
                transaction_manager_ = TransactionManager::New( tx_globaldb_,
                                                                io_,
                                                                utxo_manager_,
                                                                account_,
                                                                std::make_shared<crypto::HasherImpl>(),
                                                                is_full_node_ );

                transaction_manager_->RegisterStateChangeCallback(
                    [weak_self = weak_from_this()]( TransactionManager::State old_state,
                                                    TransactionManager::State new_state )
                    {
                        if ( auto strong = weak_self.lock() )
                        {
                            strong->TransactionStateChanged( old_state, new_state );
                        }
                    } );
                transaction_manager_->Start();
                StateTransition( NodeState::READY );
                break;
            }

            case NodeState::READY:
            {
                node_logger_->info( "GeniusNode READY" );
                break;
            }
            case NodeState::CREATING:
            default:
                break;
        }
    }

    void GeniusNode::InitOpenSSL()
    {
        SSL_library_init();
        SSL_load_error_strings();
        OpenSSL_add_all_algorithms();
    }

    bool GeniusNode::InitLoggers( const std::string &base_path )
    {
        logging_system_ = std::make_shared<soralog::LoggingSystem>( std::make_shared<soralog::ConfiguratorFromYAML>(
            // Original LibP2P logging config
            std::make_shared<libp2p::log::Configurator>(),
            // Additional logging config for application
            GetLoggingSystem( base_path ) ) );
        auto result     = logging_system_->configure();
        if ( result.has_error )
        {
            std::cerr << "Logger Error" << std::endl;
            return false;
        }

        libp2p::log::setLoggingSystem( logging_system_ );
        libp2p::log::setLevelOfGroup( "SuperGeniusDemo", soralog::Level::ERROR_ );

        std::string logdir = "";
#ifndef SGNS_DEBUGLOGS
        logdir = base_path + "/sgnslog2.log";
#endif
#ifdef SGNS_DEBUGLOGS
        // Debug mode
        node_logger_              = ConfigureLogger( "SuperGeniusNode", logdir, spdlog::level::debug );
        auto loggerGeniusNode     = ConfigureLogger( "GeniusNode", logdir, spdlog::level::debug );
        auto loggerGlobalDB       = ConfigureLogger( "GlobalDB", logdir, spdlog::level::err );
        auto loggerDAGSyncer      = ConfigureLogger( "GraphsyncDAGSyncer", logdir, spdlog::level::err );
        auto loggerGraphsync      = ConfigureLogger( "graphsync", logdir, spdlog::level::err );
        auto loggerBroadcaster    = ConfigureLogger( "PubSubBroadcasterExt", logdir, spdlog::level::err );
        auto loggerDataStore      = ConfigureLogger( "CrdtDatastore", logdir, spdlog::level::debug );
        auto loggerCRDTHeads      = ConfigureLogger( "CrdtHeads", logdir, spdlog::level::trace );
        auto loggerTransactions   = ConfigureLogger( "TransactionManager", logdir, spdlog::level::debug );
        auto loggerMigration      = ConfigureLogger( "MigrationManager", logdir, spdlog::level::trace );
        auto loggerMigrationStep  = ConfigureLogger( "MigrationStep", logdir, spdlog::level::trace );
        auto loggerQueue          = ConfigureLogger( "ProcessingTaskQueueImpl", logdir, spdlog::level::err );
        auto loggerRocksDB        = ConfigureLogger( "rocksdb", logdir, spdlog::level::err );
        auto logkad               = ConfigureLogger( "Kademlia", logdir, spdlog::level::err );
        auto logNoise             = ConfigureLogger( "Noise", logdir, spdlog::level::err );
        auto logProcessingEngine  = ConfigureLogger( "ProcessingEngine", logdir, spdlog::level::err );
        auto loggerSubQueue       = ConfigureLogger( "ProcessingSubTaskQueueAccessorImpl", logdir, spdlog::level::err );
        auto loggerProcServ       = ConfigureLogger( "ProcessingService", logdir, spdlog::level::err );
        auto loggerProcqm         = ConfigureLogger( "ProcessingSubTaskQueueManager", logdir, spdlog::level::err );
        auto loggerUPNP           = ConfigureLogger( "UPNP", logdir, spdlog::level::err );
        auto loggerProcessingNode = ConfigureLogger( "ProcessingNode", logdir, spdlog::level::err );
        auto loggerGossipPubsub   = ConfigureLogger( "GossipPubSub", logdir, spdlog::level::err );
        auto loggerAccountMessenger = ConfigureLogger( "AccountMessenger", logdir, spdlog::level::err );
        auto loggerGeniusAccount    = ConfigureLogger( "GeniusAccount", logdir, spdlog::level::err );
        auto loggerKeyPair          = ConfigureLogger( "KeyPairFileStorage", logdir, spdlog::level::err );
        auto loggerBlockchain       = ConfigureLogger( "Blockchain", logdir, spdlog::level::trace );
        auto loggerValidator        = ConfigureLogger( "ValidatorRegistry", logdir, spdlog::level::debug );
        auto loggerProcMgr          = ConfigureLogger( "SGProcessingManager", logdir, spdlog::level::err );
        auto loggerProcessor        = ConfigureLogger( "SGProcessor", logdir, spdlog::level::err );
        auto loggerCrdtCallback     = ConfigureLogger( "CRDTCallbackManager", logdir, spdlog::level::err );
        auto loggerCoinPrices       = ConfigureLogger( "CoinPrices", logdir, spdlog::level::err );
        auto loggerUTXOManager      = ConfigureLogger( "UTXOManager", logdir, spdlog::level::err );
        // AsyncIOManager loggers
        auto asioFileCommon  = ConfigureLogger( "FILECommon", logdir, spdlog::level::err );
        auto asioFileManager = ConfigureLogger( "FileManager", logdir, spdlog::level::err );
        auto asioHttpCommon  = ConfigureLogger( "HTTPCommon", logdir, spdlog::level::err );
        auto asioIpfsCommon  = ConfigureLogger( "IPFSCommon", logdir, spdlog::level::err );
        auto asioIpfsLoader  = ConfigureLogger( "IPFSLoader", logdir, spdlog::level::err );
        auto asioFileLoader  = ConfigureLogger( "MNNLoader", logdir, spdlog::level::err );
        auto asioWSCommon    = ConfigureLogger( "WSCommon", logdir, spdlog::level::err );
        // libp2p loggers
        libp2p::log::setLevelOfGroup( "*", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "Gossip", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "crypto", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "identify", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "kademlia", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "libp2p", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "mplex", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "muxer", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "plaintext", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "protocols", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "secio", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "security", soralog::Level::DEBUG );
        libp2p::log::setLevelOfGroup( "yamux", soralog::Level::DEBUG );
#else
        // Release mode
        node_logger_              = ConfigureLogger( "SuperGeniusNode", logdir, spdlog::level::trace );
        auto loggerGeniusNode     = ConfigureLogger( "GeniusNode", logdir, spdlog::level::err );
        auto loggerGlobalDB       = ConfigureLogger( "GlobalDB", logdir, spdlog::level::err );
        auto loggerDAGSyncer      = ConfigureLogger( "GraphsyncDAGSyncer", logdir, spdlog::level::err );
        auto loggerGraphsync      = ConfigureLogger( "graphsync", logdir, spdlog::level::err );
        auto loggerBroadcaster    = ConfigureLogger( "PubSubBroadcasterExt", logdir, spdlog::level::err );
        auto loggerDataStore      = ConfigureLogger( "CrdtDatastore", logdir, spdlog::level::err );
        auto loggerCRDTHeads      = ConfigureLogger( "CrdtHeads", logdir, spdlog::level::err );
        auto loggerTransactions   = ConfigureLogger( "TransactionManager", logdir, spdlog::level::err );
        auto loggerMigration      = ConfigureLogger( "MigrationManager", logdir, spdlog::level::err );
        auto loggerMigrationStep  = ConfigureLogger( "MigrationStep", logdir, spdlog::level::err );
        auto loggerQueue          = ConfigureLogger( "ProcessingTaskQueueImpl", logdir, spdlog::level::err );
        auto loggerRocksDB        = ConfigureLogger( "rocksdb", logdir, spdlog::level::err );
        auto logkad               = ConfigureLogger( "Kademlia", logdir, spdlog::level::err );
        auto logNoise             = ConfigureLogger( "Noise", logdir, spdlog::level::err );
        auto logProcessingEngine  = ConfigureLogger( "ProcessingEngine", logdir, spdlog::level::err );
        auto loggerSubQueue       = ConfigureLogger( "ProcessingSubTaskQueueAccessorImpl", logdir, spdlog::level::err );
        auto loggerProcServ       = ConfigureLogger( "ProcessingService", logdir, spdlog::level::err );
        auto loggerProcqm         = ConfigureLogger( "ProcessingSubTaskQueueManager", logdir, spdlog::level::err );
        auto loggerUPNP           = ConfigureLogger( "UPNP", logdir, spdlog::level::err );
        auto loggerProcessingNode = ConfigureLogger( "ProcessingNode", logdir, spdlog::level::err );
        auto loggerGossipPubsub   = ConfigureLogger( "GossipPubSub", logdir, spdlog::level::err );
        auto loggerAccountMessenger = ConfigureLogger( "AccountMessenger", logdir, spdlog::level::err );
        auto loggerGeniusAccount    = ConfigureLogger( "GeniusAccount", logdir, spdlog::level::err );
        auto loggerKeyPair          = ConfigureLogger( "KeyPairFileStorage", logdir, spdlog::level::err );
        auto loggerBlockchain       = ConfigureLogger( "Blockchain", logdir, spdlog::level::err );
        auto loggerValidator        = ConfigureLogger( "ValidatorRegistry", logdir, spdlog::level::err );
        auto loggerProcMgr          = ConfigureLogger( "SGProcessingManager", logdir, spdlog::level::err );
        auto loggerProcessor        = ConfigureLogger( "SGProcessor", logdir, spdlog::level::err );
        auto loggerCrdtCallback     = ConfigureLogger( "CRDTCallbackManager", logdir, spdlog::level::err );
        auto loggerCoinPrices       = ConfigureLogger( "CoinPrices", logdir, spdlog::level::err );
        auto loggerUTXOManager      = ConfigureLogger( "UTXOManager", logdir, spdlog::level::err );
        //AsyncIOManager Loggers
        auto asioFileCommon  = ConfigureLogger( "FILECommon", logdir, spdlog::level::err );
        auto asioFileManager = ConfigureLogger( "FileManager", logdir, spdlog::level::err );
        auto asioHttpCommon  = ConfigureLogger( "HTTPCommon", logdir, spdlog::level::err );
        auto asioIpfsCommon  = ConfigureLogger( "IPFSCommon", logdir, spdlog::level::err );
        auto asioIpfsLoader  = ConfigureLogger( "IPFSLoader", logdir, spdlog::level::err );
        auto asioFileLoader  = ConfigureLogger( "MNNLoader", logdir, spdlog::level::err );
        auto asioWSCommon    = ConfigureLogger( "WSCommon", logdir, spdlog::level::err );
#endif

        // Logger initialization complete
        node_logger_->info( "Logger initialized successfully" );

        return true;
    }

    bool GeniusNode::InitNetwork( uint16_t base_port, bool is_full_node )
    {
        bool ret    = true;
        pubsubport_ = GenerateRandomPort( base_port, account_->GetAddress() );

        std::string old_lanip;
        do
        {
            if ( use_upnp_ )
            {
                //ret = InitUPNP();
                (void)InitUPNP(); // Ignore UPNP init result for now
            }

            // Make a base58 out of our address
            std::string                tempaddress = account_->GetAddress();
            std::vector<unsigned char> inputBytes( tempaddress.begin(), tempaddress.end() );
            std::vector<unsigned char> hash( SHA256_DIGEST_LENGTH );
            SHA256( inputBytes.data(), inputBytes.size(), hash.data() );

            libp2p::protocol::kademlia::ContentId key( hash );
            auto                                  acc_cid = libp2p::multi::ContentIdentifierCodec::decode( key.data );
            auto maybe_base58 = libp2p::multi::ContentIdentifierCodec::toString( acc_cid.value() );
            if ( !maybe_base58 )
            {
                ret = false;
                node_logger_->error( "We couldn't convert the account {} to base58", account_->GetAddress() );
                break;
            }
            base58key_ = maybe_base58.value();

            gnus_network_full_path_ = std::string( GNUS_NETWORK_PATH ) + version::GetNetAndVersionAppendix() +
                                      base58key_;

            auto pubsubKeyPath = gnus_network_full_path_ + "/pubs_processor";

            //Set a pubsub config, use no signing because we can verify with proof and dag structure
            libp2p::protocol::gossip::Config config;
            config.echo_forward_mode       = false;
            config.sign_messages           = false;
            config.seen_cache_limit        = 10;
            config.heartbeat_interval_msec = std::chrono::milliseconds{ 500 };
            config.rw_timeout_msec         = std::chrono::seconds{ 30 };

            pubsub_ = std::make_shared<ipfs_pubsub::GossipPubSub>(
                crdt::KeyPairFileStorage( write_base_path_ + pubsubKeyPath ).GetKeyPair().value(),
                config );
            auto pubs = pubsub_->Start( pubsubport_, {}, old_lanip, {} );
            pubs.wait();
            node_logger_->info( "PubSub started at address: {}", pubsub_->GetLocalAddress() );

            if ( !is_full_node )
            {
                pubsub_->GetHost()->getConnectionManagerConfig().high_water = 300;
                pubsub_->GetHost()->getConnectionManagerConfig().low_water  = 150;
            }
            else
            {
                pubsub_->GetHost()->getConnectionManagerConfig().high_water = 400;
                pubsub_->GetHost()->getConnectionManagerConfig().low_water  = 200;
            }
            graphsyncnetwork_ = std::make_shared<ipfs_lite::ipfs::graphsync::Network>( pubsub_->GetHost(), scheduler_ );

            // Initialize DHT early so peer discovery works during database migration
            if ( autodht_ )
            {
                DHTInit();
            }
        } while ( 0 );
        return ret;
    }

    bool GeniusNode::InitUPNP()
    {
        bool ret  = false;
        auto upnp = std::make_shared<upnp::UPNP>();
        do
        {
            if ( !upnp->GetIGD() )
            {
                ret = true;
            }
            else
            {
                std::string wanip = upnp->GetWanIP();
                std::string lanip = upnp->GetLocalIP();
                node_logger_->info( "Wan IP: {}", wanip );
                node_logger_->info( "Lan IP: {}", lanip );

                std::string owner;

                constexpr uint16_t MAX_ATTEMPTS = 10;
                for ( uint16_t i = 0; i < MAX_ATTEMPTS; ++i )
                {
                    uint16_t candidate_port = pubsubport_ + i;
                    if ( upnp->CheckIfPortInUse( candidate_port, "TCP", owner ) )
                    {
                        if ( owner == lanip )
                        {
                            node_logger_->info( "Port {} is already mapped by this device. Try using it.",
                                                candidate_port );
                            if ( upnp->OpenPort( candidate_port, candidate_port, "TCP", 3600 ) )
                            {
                                ret         = true;
                                pubsubport_ = candidate_port;
                                break;
                            }

                            node_logger_->error(
                                "Port {} is already mapped by this device. We tried using it, but could not. Will try other ports.",
                                candidate_port );
                            continue;
                        }
                        node_logger_->error( "Port {} already in use by {}", candidate_port, owner );
                        continue;
                    }

                    if ( upnp->OpenPort( candidate_port, candidate_port, "TCP", 3600 ) )
                    {
                        node_logger_->info( "Successfully opened port {}", candidate_port );
                        ret         = true;
                        pubsubport_ = candidate_port;
                        break;
                    }
                    node_logger_->warn( "Failed to open port {}", candidate_port );
                }
                if ( !ret )
                {
                    node_logger_->error( "Unable to open a usable UPnP port after {} attempts", MAX_ATTEMPTS );
                    break;
                }
            }
        } while ( 0 );

        return ret;
    }

    bool GeniusNode::InitDatabase()
    {
        bool ret = false;
        do
        {
            auto global_db_ret = crdt::GlobalDB::New( io_,
                                                      write_base_path_ + gnus_network_full_path_,
                                                      pubsub_,
                                                      crdt::CrdtOptions::DefaultOptions(),
                                                      graphsyncnetwork_,
                                                      scheduler_,
                                                      generator_ );
            if ( global_db_ret.has_error() )
            {
                node_logger_->error( "Error creating GlobalDB: {}", global_db_ret.error().message() );
                break;
            }
            tx_globaldb_ = std::move( global_db_ret.value() );

            tx_globaldb_->Start();

            ret = true;
        } while ( 0 );
        return ret;
    }

    bool GeniusNode::InitProcessingModules()
    {
        bool ret = true;

        task_queue_ = std::make_shared<processing::ProcessingTaskQueueImpl>( tx_globaldb_, processing_channel_topic_ );
        processing_core_ = std::make_shared<processing::ProcessingCoreImpl>( tx_globaldb_, 1, dev_config_.TokenID );

        task_result_storage_ = std::make_shared<processing::SubTaskResultStorageImpl>( tx_globaldb_,
                                                                                       processing_channel_topic_ );

        return ret;
    }

    void GeniusNode::MigrateDatabase( std::function<void( outcome::result<void> )> callback )
    {
        auto migrationManager = sgns::MigrationManager::New( io_,               // ioContext
                                                             pubsub_,           // pubSub
                                                             graphsyncnetwork_, // graphsync
                                                             scheduler_,        // scheduler
                                                             generator_,        // generator
                                                             write_base_path_,  // writeBasePath
                                                             base58key_,        // base58key
                                                             account_ );

        std::thread migration_thread(
            [manager = std::move( migrationManager ), cb = std::move( callback )]()
            {
                auto migrationResult = manager->Migrate();
                if ( cb )
                {
                    cb( migrationResult );
                }
            } );
        migration_thread.detach();
    }

    void GeniusNode::ScheduleMigrationRetry()
    {
        std::thread(
            [weak_self = weak_from_this()]
            {
                std::this_thread::sleep_for( std::chrono::seconds( 5 ) );
                if ( auto strong = weak_self.lock() )
                {
                    strong->StateTransition( NodeState::MIGRATING_DATABASE );
                }
            } )
            .detach();
    }

    void GeniusNode::ScheduleBlockchainRetry()
    {
        std::thread(
            [weak_self = weak_from_this()]
            {
                std::this_thread::sleep_for( std::chrono::seconds( 5 ) );
                if ( auto strong = weak_self.lock() )
                {
                    auto current_state = strong->state_.load();
                    if ( current_state != NodeState::INITIALIZING_BLOCKCHAIN )
                    {
                        strong->node_logger_->debug( "Skipping blockchain retry, unexpected state: {}",
                                                     NodeStateToString( current_state ) );
                        return;
                    }
                    strong->StateTransition( NodeState::INITIALIZING_BLOCKCHAIN );
                }
            } )
            .detach();
    }

    base::Logger GeniusNode::ConfigureLogger( const std::string        &tag,
                                              const std::string        &logdir,
                                              spdlog::level::level_enum level )
    {
        auto logger = base::createLogger( tag, logdir );
        logger->set_level( level );
        if ( level != spdlog::level::off )
        {
            logger->flush_on( level );
        }
        return logger;
    }

    GeniusNode::~GeniusNode()
    {
        node_logger_->debug( "~GeniusNode CALLED" );

        if ( pubsub_ )
        {
            pubsub_->Stop(); // Stop activities of OtherClass
        }
        if ( io_ )
        {
            io_->stop(); // Stop our io_context
        }
        if ( io_work_guard_ )
        {
            io_work_guard_->reset();
        }
        for ( auto &t : io_threads_ )
        {
            if ( t.joinable() )
            {
                t.join();
            }
        }
        io_threads_.clear();
        stop_upnp = true;
        if ( upnp_thread.joinable() )
        {
            upnp_thread.join();
        }
        if ( processing_service_ )
        {
            processing_service_->StopProcessing();
        }
        if ( processing_callback_pool_ )
        {
            processing_callback_pool_->join();
            processing_callback_pool_.reset();
        }
        std::this_thread::sleep_for( std::chrono::milliseconds( 50 ) );
        node_logger_->debug( "~GeniusNode FINISHED" );
    }

    void GeniusNode::RefreshUPNP( uint16_t pubsubport )
    {
        if ( upnp_thread.joinable() )
        {
            stop_upnp = true;   // Signal the existing thread to stop
            upnp_thread.join(); // Wait for it to finish
        }

        stop_upnp = false; // Reset the stop flag for the new thread

        upnp_thread = std::thread(
            [this, pubsubport]()
            {
                auto next_refresh_time = std::chrono::steady_clock::now() + std::chrono::minutes( 60 );
                auto upnp_shared       = std::make_shared<upnp::UPNP>();

                while ( !stop_upnp )
                {
                    if ( std::chrono::steady_clock::now() >= next_refresh_time )
                    {
                        std::weak_ptr<upnp::UPNP> upnp_weak = upnp_shared;

                        if ( auto upnp = upnp_weak.lock() )
                        {
                            if ( upnp->GetIGD() )
                            {
                                auto openedPort = upnp->OpenPort( pubsubport, pubsubport, "TCP", 3600 );
                                if ( !openedPort )
                                {
                                    GeniusNodeLogger()->error( "Failed to open port" );
                                }
                                else
                                {
                                    GeniusNodeLogger()->info( "Open Ports Success pubsub: {} ", pubsubport );
                                }
                            }
                            else
                            {
                                GeniusNodeLogger()->info( "No IGD" );
                            }
                        }
                        else
                        {
                            GeniusNodeLogger()->info( "UPNP weak_ptr expired" );
                            stop_upnp = true; // Signal thread to stop gracefully
                        }

                        next_refresh_time = std::chrono::steady_clock::now() + std::chrono::minutes( 60 );
                    }

                    std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
                }
            } );
    }

    void GeniusNode::AddPeer( const std::string &peer )
    {
        pubsub_->AddPeers( { peer } );
    }

    void GeniusNode::DHTInit()
    {
        // Encode the string to UTF-8 bytes
        std::string                temp = processing_grid_chanel_topic_ + sgns::version::GetNetAndVersionAppendix();
        std::vector<unsigned char> inputBytes( temp.begin(), temp.end() );

        // Compute the SHA-256 hash of the input bytes
        std::vector<unsigned char> hash( SHA256_DIGEST_LENGTH );
        SHA256( inputBytes.data(), inputBytes.size(), hash.data() );
        // Provide CID
        libp2p::protocol::kademlia::ContentId key( hash );
        pubsub_->GetDHT()->Start();
        pubsub_->ProvideCID( key );

        auto cidtest = libp2p::multi::ContentIdentifierCodec::decode( key.data );

        auto cidstring = libp2p::multi::ContentIdentifierCodec::toString( cidtest.value() );
        node_logger_->info( "CID Test:: {}", cidstring.value() );

        // Also Find providers
        pubsub_->StartFindingPeers( key );
    }

    std::string generate_uuid_with_ipfs_id( const std::string &ipfs_id )
    {
        // Hash the IPFS ID
        std::hash<std::string> hasher;
        uint64_t               id_hash = hasher( ipfs_id );

        // Get a high-resolution timestamp
        auto now       = std::chrono::high_resolution_clock::now();
        auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>( now.time_since_epoch() ).count();

        // Combine the IPFS ID hash and timestamp to create a seed
        uint64_t seed = id_hash ^ static_cast<uint64_t>( timestamp );

        // Seed the PRNG
        std::mt19937                                       gen( seed );
        boost::uuids::basic_random_generator<std::mt19937> uuid_gen( gen );

        // Generate UUID
        boost::uuids::uuid uuid = uuid_gen();
        return boost::uuids::to_string( uuid );
    }

    outcome::result<std::string> GeniusNode::ProcessImage( const std::string &jsondata )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        OUTCOME_TRY( auto procmgr, sgns::sgprocessing::ProcessingManager::Create( jsondata ) );

        auto funds = GetProcessCost( procmgr );
        if ( funds <= 0 )
        {
            return outcome::failure( Error::PROCESS_COST_ERROR );
        }

        if ( utxo_manager_.GetBalance() < funds )
        {
            return outcome::failure( Error::INSUFFICIENT_FUNDS );
        }

        SGProcessing::Task task;
        auto               uuidstring = generate_uuid_with_ipfs_id( pubsub_->GetHost()->getId().toBase58() );

        //Make a small json to insert without extra indentation and spacing.
        json smalljson;
        sgns::to_json( smalljson, procmgr->GetProcessingData() );
        task.set_ipfs_block_id( uuidstring );
        task.set_json_data( smalljson.dump( -1 ) );
        task.set_random_seed( 0 );
        task.set_results_channel( ( boost::format( "RESULT_CHANNEL_ID_%1%" ) % ( 1 ) ).str() );
        //Get Processing Data
        auto procdata = procmgr->GetProcessingData();

        //Split into subtasks
        processing::ProcessTaskSplitter  taskSplitter;
        std::list<SGProcessing::SubTask> subTasks;
        //Make Copies, trying to use references for passes/input nodes may cause problems.
        auto passes = procdata.get_passes();
        for ( const auto &pass : passes )
        {
            auto input_nodes = pass.get_model().value().get_input_nodes();
            for ( auto &model : input_nodes )
            {
                json modeljson;
                sgns::to_json( modeljson, model );
                auto   index = procmgr->GetInputIndex( model.get_source().value() );
                size_t nChunks =
                    procdata.get_inputs()[index.value()].get_dimensions().value().get_chunk_count().value();
                rapidjson::StringBuffer                    buffer;
                rapidjson::Writer<rapidjson::StringBuffer> writer( buffer );

                taskSplitter.SplitTask( task,
                                        subTasks,
                                        modeljson.dump( -1 ),
                                        nChunks,
                                        false,
                                        pubsub_->GetHost()->getId().toBase58() );
            }
        }
        if ( subTasks.size() <= 0 )
        {
            return outcome::failure( Error::INVALID_JSON );
        }
        auto cut = sgns::TokenAmount::ParseMinions( dev_config_.Cut );
        if ( !cut )
        {
            return outcome::failure( cut.error() );
        }

        OUTCOME_TRY( auto &&manager, GetTransactionManager() );
        OUTCOME_TRY( ( auto &&, result_pair ),
                     manager->HoldEscrow( funds, std::string( dev_config_.Addr ), cut.value(), uuidstring ) );

        auto [tx_id, escrow_data_pair] = result_pair;

        auto [escrow_path, escrow_data] = escrow_data_pair;

        task.set_escrow_path( escrow_path );

        auto enqueue_task_return = task_queue_->EnqueueTask( task, subTasks );
        if ( enqueue_task_return.has_failure() )
        {
            return outcome::failure( Error::DATABASE_WRITE_ERROR );
        }
        auto send_escrow_return = task_queue_->SendEscrow( escrow_path, std::move( escrow_data ) );
        if ( send_escrow_return.has_failure() )
        {
            task_queue_->ResetAtomicTransaction();
            return outcome::failure( Error::DATABASE_WRITE_ERROR );
        }

        return tx_id;
    }

    uint64_t GeniusNode::GetProcessCost( std::shared_ptr<sgns::sgprocessing::ProcessingManager> &procmgr )
    {
        auto blockLen = procmgr->ParseBlockSize();
        if ( !blockLen )
        {
            node_logger_->error( "ParseBlockSize failed" );
            return 0;
        }
        node_logger_->trace( "Parsed totalBytes: {}", blockLen.value() );

        auto maybeGnusPrice = GetGNUSPrice();
        if ( !maybeGnusPrice )
        {
            node_logger_->error( "GetGNUSPrice failed" );
            return 0;
        }
        double gnusPrice = maybeGnusPrice.value();
        node_logger_->trace( "Retrieved GNUS price (USD/genius): {}", gnusPrice );

        auto rawMinionsRes = TokenAmount::CalculateCostMinions( blockLen.value(), gnusPrice );
        if ( !rawMinionsRes )
        {
            node_logger_->error( "TokenAmount::CalculateCostMinions failed" );
            return 0;
        }
        uint64_t rawMinions = rawMinionsRes.value();
        node_logger_->trace( "Raw cost in minions: {}", rawMinions );

        return rawMinions;
    }

    outcome::result<double> GeniusNode::GetGNUSPrice()
    {
        auto price = GetCoinprice( { "genius-ai" } );
        if ( !price )
        {
            return outcome::failure( Error::NO_PRICE );
        }
        return price.value()["genius-ai"];
    }

    std::string GeniusNode::GetVersion()
    {
        return sgns::version::SuperGeniusVersionFullString();
    }

    outcome::result<std::pair<std::string, uint64_t>> GeniusNode::MintTokens( uint64_t           amount,
                                                                              const std::string &transaction_hash,
                                                                              const std::string &chainid,
                                                                              TokenID            tokenid,
                                                                              std::chrono::milliseconds timeout )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            node_logger_->error( "{}: Transaction manager not ready", __func__ );
            return outcome::failure( boost::system::error_code{} );
        }
        auto start_time = std::chrono::steady_clock::now();

        OUTCOME_TRY( auto &&manager, GetTransactionManager() );
        OUTCOME_TRY( auto &&tx_id, manager->MintFunds( amount, transaction_hash, chainid, tokenid ) );

        auto mint_result = manager->WaitForTransactionOutgoing( tx_id, timeout );

        auto end_time = std::chrono::steady_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( end_time - start_time ).count();

        if ( mint_result != TransactionManager::TransactionStatus::CONFIRMED )
        {
            node_logger_->error( "Mint transaction {} failed after {} ms", tx_id, duration );
            return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::timed_out ) );
        }

        node_logger_->debug( "Mint transaction {} completed in {} ms", tx_id, duration );
        return std::make_pair( tx_id, duration );
    }

    outcome::result<std::pair<std::string, uint64_t>> GeniusNode::TransferFunds( uint64_t                  amount,
                                                                                 const std::string        &destination,
                                                                                 TokenID                   token_id,
                                                                                 std::chrono::milliseconds timeout )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        auto start_time = std::chrono::steady_clock::now();

        OUTCOME_TRY( auto &&manager, GetTransactionManager() );
        OUTCOME_TRY( auto &&tx_id, manager->TransferFunds( amount, destination, token_id ) );

        auto transfer_result = manager->WaitForTransactionOutgoing( tx_id, timeout );

        auto end_time = std::chrono::steady_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( end_time - start_time ).count();

        if ( transfer_result != TransactionManager::TransactionStatus::CONFIRMED )
        {
            node_logger_->error( "TransferFunds transaction {} failed after {} ms", tx_id, duration );
            return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::timed_out ) );
        }

        node_logger_->debug( "TransferFunds transaction {} completed in {} ms", tx_id, duration );
        return std::make_pair( tx_id, duration );
    }

    outcome::result<std::string> GeniusNode::TransferFunds( uint64_t           amount,
                                                            const std::string &destination,
                                                            TokenID            token_id )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            return outcome::failure( boost::system::error_code{} );
        }

        OUTCOME_TRY( auto &&manager, GetTransactionManager() );
        OUTCOME_TRY( auto &&tx_id, manager->TransferFunds( amount, destination, token_id ) );

        node_logger_->debug( "TransferFunds transaction {} sent", tx_id );
        return tx_id;
    }

    outcome::result<std::pair<std::string, uint64_t>> GeniusNode::PayDev( uint64_t                  amount,
                                                                          TokenID                   token_id,
                                                                          std::chrono::milliseconds timeout )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        auto start_time = std::chrono::steady_clock::now();
        OUTCOME_TRY( auto &&manager, GetTransactionManager() );
        OUTCOME_TRY( auto &&tx_id, manager->TransferFunds( amount, dev_config_.Addr, token_id ) );

        auto paydev_result = manager->WaitForTransactionOutgoing( tx_id, timeout );

        auto end_time = std::chrono::steady_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( end_time - start_time ).count();

        if ( paydev_result != TransactionManager::TransactionStatus::CONFIRMED )
        {
            node_logger_->error( "TransferFunds transaction {} failed after {} ms", tx_id, duration );
            return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::timed_out ) );
        }

        node_logger_->debug( "TransferFunds transaction {} completed in {} ms", tx_id, duration );
        return std::make_pair( tx_id, duration );
    }

    outcome::result<std::pair<std::string, uint64_t>> GeniusNode::PayEscrow(
        const std::string                       &escrow_path,
        const SGProcessing::TaskResult          &taskresult,
        std::shared_ptr<crdt::AtomicTransaction> crdt_transaction,
        std::chrono::milliseconds                timeout )
    {
        if ( GetTransactionManagerState() != TransactionManager::State::READY )
        {
            return outcome::failure( boost::system::error_code{} );
        }
        auto start_time = std::chrono::steady_clock::now();

        OUTCOME_TRY( auto &&manager, GetTransactionManager() );
        OUTCOME_TRY( auto &&tx_id, manager->PayEscrow( escrow_path, taskresult, std::move( crdt_transaction ) ) );

        auto payescrow_result = manager->WaitForTransactionOutgoing( tx_id, timeout );

        auto end_time = std::chrono::steady_clock::now();
        auto duration = std::chrono::duration_cast<std::chrono::milliseconds>( end_time - start_time ).count();

        if ( payescrow_result != TransactionManager::TransactionStatus::CONFIRMED )
        {
            node_logger_->error( "Pay escrow transaction {} failed after {} ms", tx_id, duration );
            return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::timed_out ) );
        }

        node_logger_->debug( "Pay escrow transaction {} completed in {} ms", tx_id, duration );
        return std::make_pair( tx_id, duration );
    }

    uint64_t GeniusNode::GetBalance()
    {
        return utxo_manager_.GetBalance();
    }

    uint64_t GeniusNode::GetBalance( const TokenID token_id )
    {
        return utxo_manager_.GetBalance( token_id );
    }

    uint64_t GeniusNode::GetBalance( const std::string &address )
    {
        return utxo_manager_.GetBalance( address );
    }

    uint64_t GeniusNode::GetBalance( const TokenID token_id, const std::string &address )
    {
        return utxo_manager_.GetBalance( token_id, address );
    }

    void GeniusNode::ProcessingDone( const std::string &task_id, const SGProcessing::TaskResult &taskresult )
    {
        boost::asio::post(
            *processing_callback_pool_,
            [weak_self( weak_from_this() ), task_id, taskresult]()
            {
                if ( auto strong = weak_self.lock() )
                {
                    strong->node_logger_->info( "[{}]{}: SUCCESS PROCESSING TASK {}",
                                                strong->account_->GetAddress().substr( 0, 8 ),
                                                __func__,
                                                task_id );
                    do
                    {
                        if ( strong->task_queue_->IsTaskCompleted( task_id ) )
                        {
                            strong->node_logger_->info( "[{}]{}: Task Already completed!",
                                                        strong->account_->GetAddress().substr( 0, 8 ),
                                                        __func__ );
                            break;
                        }
                        if ( strong->GetTransactionManagerState() != TransactionManager::State::READY )
                        {
                            strong->node_logger_->info( "[{}]{}: Transactions are not ready",
                                                        strong->account_->GetAddress().substr( 0, 8 ),
                                                        __func__ );
                            break;
                        }
                        strong->node_logger_->info( "[{}]{}: Transactions READY",
                                                    strong->account_->GetAddress().substr( 0, 8 ),
                                                    __func__ );
                        auto maybe_escrow_path = strong->task_queue_->GetTaskEscrow( task_id );
                        if ( maybe_escrow_path.has_failure() )
                        {
                            strong->node_logger_->info( "[{}]{}: No associated Escrow with the task id: {} ",
                                                        strong->account_->GetAddress().substr( 0, 8 ),
                                                        __func__,
                                                        task_id );
                            break;
                        }
                        auto complete_task_result = strong->task_queue_->CompleteTask( task_id, taskresult );
                        if ( complete_task_result.has_failure() )
                        {
                            strong->node_logger_->error( "[{}]{}: Unable to complete task: {} ",
                                                         strong->account_->GetAddress().substr( 0, 8 ),
                                                         __func__,
                                                         task_id );
                            break;
                        }
                        strong->node_logger_->info( "[{}]{}: Creating the payout transactions",
                                                    strong->account_->GetAddress().substr( 0, 8 ),
                                                    __func__ );
                        auto pay_result = strong->PayEscrow( maybe_escrow_path.value(),
                                                             taskresult,
                                                             std::move( complete_task_result.value() ) );
                        if ( pay_result.has_failure() )
                        {
                            strong->node_logger_->error( "[{}]{}: Escrow not paid for task: {} ",
                                                         strong->account_->GetAddress().substr( 0, 8 ),
                                                         __func__,
                                                         task_id );
                            break;
                        }
                        strong->node_logger_->info( "[{}]{}: Paid for task: {}",
                                                    strong->account_->GetAddress().substr( 0, 8 ),
                                                    __func__,
                                                    task_id );

                    } while ( 0 );
                }
            } );
    }

    void GeniusNode::ProcessingError( const std::string &task_id )
    {
        boost::asio::post( *processing_callback_pool_,
                           [weak_self( weak_from_this() ), task_id]()
                           {
                               if ( auto strong = weak_self.lock() )
                               {
                                   strong->node_logger_->error( "[ {} ] ERROR PROCESSING SUBTASK ",
                                                                strong->account_->GetAddress().substr( 0, 8 ),
                                                                task_id );
                               }
                           } );
    }

    void GeniusNode::PrintDataStore() const
    {
        if ( tx_globaldb_ )
        {
            tx_globaldb_->PrintDataStore();
        }
        else
        {
            node_logger_->error( "{}: GlobalDB is not initialized", __func__ );
        }
    }

    void GeniusNode::StopProcessing()
    {
        if ( processing_service_ )
        {
            processing_service_->StopProcessing();
        }
        else
        {
            node_logger_->error( "{}: Processing service is not initialized", __func__ );
        }
    }

    void GeniusNode::StartProcessing()
    {
        if ( processing_service_ )
        {
            processing_service_->StartProcessing( processing_grid_chanel_topic_ );
        }
        else
        {
            node_logger_->error( "{}: Processing service is not initialized", __func__ );
        }
    }

    outcome::result<std::map<std::string, double>> GeniusNode::GetCoinprice( const std::vector<std::string> &tokenIds )
    {
        auto                          currentTime = std::chrono::system_clock::now();
        std::map<std::string, double> result;
        std::vector<std::string>      tokensToFetch;
        // Determine which tokens need to be fetched
        for ( const auto &tokenId : tokenIds )
        {
            auto it = m_tokenPriceCache.find( tokenId );

            if ( it != m_tokenPriceCache.end() && ( currentTime - it->second.lastUpdate ) < m_cacheValidityDuration )
            {
                // Use cached price if it's still valid
                result[tokenId] = it->second.price;
            }
            else
            {
                // Add to the list of tokens that need fresh data
                tokensToFetch.push_back( tokenId );
            }
        }

        // If we have tokens to fetch and we're not rate limited
        if ( !tokensToFetch.empty() && ( currentTime - m_lastApiCall ) >= m_minApiCallInterval )
        {
            sgns::CoinGeckoPriceRetriever retriever;
            auto                          newPricesResult = retriever.getCurrentPrices( tokensToFetch );

            if ( newPricesResult )
            {
                auto &newPrices = newPricesResult.value();
                m_lastApiCall   = currentTime;

                // Update the cache and result with new prices
                for ( const auto &[token, price] : newPrices )
                {
                    m_tokenPriceCache[token] = { price, currentTime };
                    result[token]            = price;
                }
            }
            else
            {
                // Handle the error case
                // If we have some cached data, continue with what we have
                if ( result.empty() )
                {
                    // Only return error if we have no data at all
                    return newPricesResult.error();
                }
                // Otherwise, continue with partial data and log the error
                // log("Failed to fetch prices for some tokens: " + newPricesResult.error().message());
            }
        }

        return result;
    }

    outcome::result<std::map<std::string, std::map<int64_t, double>>> GeniusNode::GetCoinPriceByDate(
        const std::vector<std::string> &tokenIds,
        const std::vector<int64_t>     &timestamps )
    {
        sgns::CoinGeckoPriceRetriever retriever;
        return retriever.getHistoricalPrices( tokenIds, timestamps );
    }

    outcome::result<std::map<std::string, std::map<int64_t, double>>> GeniusNode::GetCoinPricesByDateRange(
        const std::vector<std::string> &tokenIds,
        int64_t                         from,
        int64_t                         to )
    {
        sgns::CoinGeckoPriceRetriever retriever;
        return retriever.getHistoricalPriceRange( tokenIds, from, to );
    }

    outcome::result<std::string> GeniusNode::FormatTokens( uint64_t amount, TokenID tokenId )
    {
        if ( tokenId.IsGNUS() )
        {
            return TokenAmount::FormatMinions( amount );
        }
        if ( tokenId.Equals( dev_config_.TokenID ) )
        {
            auto child = TokenAmount::ConvertToChildToken( amount, dev_config_.TokenValueInGNUS );
            if ( !child )
            {
                return outcome::failure( child.error() );
            }
            return child.value();
        }
        return outcome::failure( make_error_code( GeniusNode::Error::TOKEN_ID_MISMATCH ) );
    }

    outcome::result<uint64_t> GeniusNode::ParseTokens( const std::string &str, TokenID tokenId )
    {
        if ( tokenId.IsGNUS() )
        {
            return TokenAmount::ParseMinions( str );
        }
        if ( tokenId.Equals( dev_config_.TokenID ) )
        {
            return TokenAmount::ConvertFromChildToken( str, dev_config_.TokenValueInGNUS );
        }
        return outcome::failure( make_error_code( GeniusNode::Error::TOKEN_ID_MISMATCH ) );
    }

    // Wait for a transaction to be processed with a timeout
    TransactionManager::TransactionStatus GeniusNode::WaitForTransactionOutgoing( const std::string        &txId,
                                                                                  std::chrono::milliseconds timeout )
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return TransactionManager::TransactionStatus::INVALID;
        }
        return manager_result.value()->WaitForTransactionOutgoing( txId, timeout );
    }

    // Wait for a transaction to be processed with a timeout
    TransactionManager::TransactionStatus GeniusNode::WaitForTransactionIncoming( const std::string        &txId,
                                                                                  std::chrono::milliseconds timeout )
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return TransactionManager::TransactionStatus::INVALID;
        }
        return manager_result.value()->WaitForTransactionIncoming( txId, timeout );
    }

    TransactionManager::TransactionStatus GeniusNode::WaitForEscrowRelease( const std::string        &originalEscrowId,
                                                                            std::chrono::milliseconds timeout )
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return TransactionManager::TransactionStatus::INVALID;
        }
        return manager_result.value()->WaitForEscrowRelease( originalEscrowId, timeout );
    }

    outcome::result<std::shared_ptr<TransactionManager>> GeniusNode::GetTransactionManager() const
    {
        if ( !transaction_manager_ )
        {
            return outcome::failure( Error::TRANSACTIONS_NOT_READY );
        }
        return transaction_manager_;
    }

    TransactionManager::State GeniusNode::GetTransactionManagerState() const
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            return TransactionManager::State::CREATING;
        }
        return manager_result.value()->GetState();
    }

    void GeniusNode::SendTransactionAndProof( std::shared_ptr<IGeniusTransactions> tx, std::vector<uint8_t> proof )
    {
        auto manager_result = GetTransactionManager();
        if ( manager_result.has_value() )
        {
            manager_result.value()->EnqueueTransaction( std::make_pair( tx, proof ) );
        }
        else
        {
            node_logger_->error( "{}: Transactions not ready", __func__ );
        }
    }

    void GeniusNode::ConfigureTransactionFilterTimeoutsMs( uint64_t timeframe_limit_ms, uint64_t mutability_window_ms )
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            node_logger_->error( "{}: Transactions not ready", __func__ );
            return;
        }
        auto manager = manager_result.value();
        manager->SetTimeFrameToleranceMs( timeframe_limit_ms );
        manager->SetMutabilityWindowMs( mutability_window_ms );
    }

    void GeniusNode::RotateLogFiles( const std::string &base_path )
    {
        std::filesystem::path basePath( base_path );

        // Define log file paths
        std::filesystem::path sgnslog_path      = basePath / "sgnslog.log";
        std::filesystem::path sgnslog2_path     = basePath / "sgnslog2.log";
        std::filesystem::path sgnslog_old_path  = basePath / "sgnslog.old.log";
        std::filesystem::path sgnslog2_old_path = basePath / "sgnslog2.old.log";

        try
        {
            // Handle sgnslog.log rotation
            if ( std::filesystem::exists( sgnslog_path ) )
            {
                // Delete old backup if it exists
                if ( std::filesystem::exists( sgnslog_old_path ) )
                {
                    std::filesystem::remove( sgnslog_old_path );
                    std::cout << "Deleted old backup: " << sgnslog_old_path << std::endl;
                }

                // Rename current log to backup
                std::filesystem::rename( sgnslog_path, sgnslog_old_path );
                std::cout << "Rotated log: " << sgnslog_path << " -> " << sgnslog_old_path << std::endl;
            }

            // Handle sgnslog2.log rotation
            if ( std::filesystem::exists( sgnslog2_path ) )
            {
                // Delete old backup if it exists
                if ( std::filesystem::exists( sgnslog2_old_path ) )
                {
                    std::filesystem::remove( sgnslog2_old_path );
                    std::cout << "Deleted old backup: " << sgnslog2_old_path << std::endl;
                }

                // Rename current log to backup
                std::filesystem::rename( sgnslog2_path, sgnslog2_old_path );
                std::cout << "Rotated log: " << sgnslog2_path << " -> " << sgnslog2_old_path << std::endl;
            }
        }
        catch ( const std::filesystem::filesystem_error &e )
        {
            std::cerr << "Log rotation error: " << e.what() << std::endl;
            // Continue execution - don't let log rotation failure stop the application
        }
    }

    TransactionManager::TransactionStatus GeniusNode::GetTransactionStatus( const std::string &txId ) const
    {
        auto manager_result = GetTransactionManager();
        if ( !manager_result.has_value() )
        {
            node_logger_->error( "{}: Transactions not ready", __func__ );
            return TransactionManager::TransactionStatus::INVALID;
        }
        auto manager = manager_result.value();
        auto retval  = manager->GetOutgoingStatusByTxId( txId );
        if ( retval == TransactionManager::TransactionStatus::INVALID )
        {
            retval = manager->GetIncomingStatusByTxId( txId );
        }
        return retval;
    }

    void GeniusNode::TransactionStateChanged( TransactionManager::State old_state, TransactionManager::State new_state )
    {
        node_logger_->info( "Transaction Manager state changed from {} to {}", old_state, new_state );

        switch ( new_state )
        {
            case TransactionManager::State::READY:
                if ( isprocessor_ )
                {
                    StartProcessing();
                }
                break;
            case TransactionManager::State::INITIALIZING:
            case TransactionManager::State::SYNCING:
                if ( isprocessor_ )
                {
                    StopProcessing();
                }
                break;
            case TransactionManager::State::CREATING:
            default:
                break;
        }
    }

    void GeniusNode::SetAuthorizedFullNodeAddress( const std::string &pub_address )
    {
        Blockchain::SetAuthorizedFullNodeAddress( pub_address );
        if ( blockchain_ )
        {
            blockchain_->Start();
        }
    }

    const std::string &GeniusNode::GetAuthorizedFullNodeAddress() const
    {
        return Blockchain::GetAuthorizedFullNodeAddress();
    }
}

Updated on 2026-03-04 at 13:10:44 -0800