Skip to content

src/account/TransactionManager.hpp

More...

Namespaces

Name
sgns

Classes

Name
class sgns::TransactionManager
struct fmt::formatter< sgns::TransactionManager::State >

Types

Name
using std::pair< std::string, base::Buffer > EscrowDataPair

Detailed Description

Date: 2024-03-13 Henrique A. Klein ([email protected])

Types Documentation

using EscrowDataPair

using sgns::EscrowDataPair = std::pair<std::string, base::Buffer>;

Source code

#ifndef _TRANSACTION_MANAGER_HPP_
#define _TRANSACTION_MANAGER_HPP_

#include "UTXOManager.hpp"

#include <memory>
#include <deque>
#include <cstdint>
#include <unordered_map>
#include <unordered_set>
#include <optional>

#include <boost/format.hpp>

#include "crdt/globaldb/globaldb.hpp"
#include "crdt/atomic_transaction.hpp"
#include "account/proto/SGTransaction.pb.h"
#include "account/IGeniusTransactions.hpp"
#include "account/GeniusAccount.hpp"
#include "base/logger.hpp"
#include "base/buffer.hpp"
#include "crypto/hasher.hpp"

#include "proof/proto/SGProof.pb.h"

#include "processing/proto/SGProcessing.pb.h"
#include "outcome/outcome.hpp"

namespace sgns
{
    using namespace boost::multiprecision;
    using EscrowDataPair = std::pair<std::string, base::Buffer>;

    class TransactionManager : public std::enable_shared_from_this<TransactionManager>
    {
    public:
        static constexpr std::string_view GNUS_FULL_NODES_TOPIC        = "SuperGNUSNode.TestNet.FullNode";
        static constexpr std::string_view GNUS_FULL_NODES_TOPIC_LEGACY = "SuperGNUSNode.TestNet.FullNode.963";
        static constexpr uint64_t         NONCE_REQUEST_TIMEOUT_MS =
            10000; 

        enum class State : uint8_t
        {
            CREATING = 0, 
            INITIALIZING, 
            SYNCING,      
            READY,        
        };

        using TransactionPair  = std::pair<std::shared_ptr<IGeniusTransactions>, std::optional<std::vector<uint8_t>>>;
        using TransactionBatch = std::vector<TransactionPair>;
        using TransactionItem  = std::pair<TransactionBatch, std::optional<std::shared_ptr<crdt::AtomicTransaction>>>;
        using StateChangeCallback = std::function<void( const State &previous, const State &current )>;

        enum class TransactionStatus : uint8_t
        {
            CREATED,   
            SENDING,   
            CONFIRMED, 
            VERIFYING, 
            FAILED,    
            INVALID    
        };

        static std::shared_ptr<TransactionManager> New(
            std::shared_ptr<crdt::GlobalDB>          processing_db,
            std::shared_ptr<boost::asio::io_context> ctx,
            UTXOManager                             &utxo_manager,
            std::shared_ptr<GeniusAccount>           account,
            std::shared_ptr<crypto::Hasher>          hasher,
            bool                                     full_node           = false,
            std::chrono::milliseconds                timestamp_tolerance = std::chrono::milliseconds( 0 ),
            std::chrono::milliseconds                mutability_window   = std::chrono::milliseconds( 0 ) );

        ~TransactionManager();

        void Start();
        void PrintAccountInfo() const;

        const GeniusAccount &GetAccount() const;

        std::vector<std::vector<uint8_t>> GetOutTransactions() const;
        std::vector<std::vector<uint8_t>> GetInTransactions() const;

        outcome::result<std::string> TransferFunds( uint64_t amount, const std::string &destination, TokenID token_id );
        outcome::result<std::string> MintFunds( uint64_t    amount,
                                                std::string transaction_hash,
                                                std::string chainid,
                                                TokenID     tokenid );
        outcome::result<std::pair<std::string, EscrowDataPair>> HoldEscrow( uint64_t           amount,
                                                                            const std::string &dev_addr,
                                                                            uint64_t           peers_cut,
                                                                            const std::string &job_id );
        outcome::result<std::string>                            PayEscrow( const std::string                       &escrow_path,
                                                                           const SGProcessing::TaskResult          &task_result,
                                                                           std::shared_ptr<crdt::AtomicTransaction> crdt_transaction );

        // Wait for an incoming transaction to be processed with a timeout
        TransactionStatus WaitForTransactionIncoming( const std::string        &txId,
                                                      std::chrono::milliseconds timeout ) const;
        // Wait for an outgoing transaction to be processed with a timeout
        TransactionStatus WaitForTransactionOutgoing( const std::string        &txId,
                                                      std::chrono::milliseconds timeout ) const;
        TransactionStatus WaitForEscrowRelease( const std::string        &originalEscrowId,
                                                std::chrono::milliseconds timeout ) const;

        static std::string GetTransactionPath( uint16_t base, const std::string &tx_hash );
        static std::string GetTransactionPath( const IGeniusTransactions &element );
        static std::string GetTransactionPath( const std::string &tx_hash );

        static std::string GetTransactionProofPath( const IGeniusTransactions &element );
        static outcome::result<std::shared_ptr<IGeniusTransactions>> FetchTransaction(
            const std::shared_ptr<crdt::GlobalDB> &db,
            std::string_view                       transaction_key );
        static outcome::result<std::shared_ptr<IGeniusTransactions>> DeSerializeTransaction(
            const base::Buffer &tx_data );

        State GetState() const
        {
            return state_m;
        }

        TransactionStatus GetOutgoingStatusByTxId( const std::string &txId ) const;
        TransactionStatus GetIncomingStatusByTxId( const std::string &txId ) const;

        outcome::result<std::shared_ptr<IGeniusTransactions>> GetConflictingTransaction(
            const IGeniusTransactions &element ) const;

        void Stop();

        void RegisterStateChangeCallback( StateChangeCallback callback );
        void UnregisterStateChangeCallback();

        static std::string StateToString( State state )
        {
            switch ( state )
            {
                case State::CREATING:
                    return "CREATING";
                case State::INITIALIZING:
                    return "INITIALIZING";
                case State::SYNCING:
                    return "SYNCING";
                case State::READY:
                    return "READY";
                default:
                    return "UNKNOWN";
            }
        }

        static std::string GetBlockChainBase( uint16_t network_id );

        outcome::result<void> QueryTransactions();
        outcome::result<void> FetchAndProcessTransaction( const std::string          &tx_key,
                                                          std::optional<base::Buffer> tx_data = std::nullopt );

    protected:
        friend class GeniusNode;
        void EnqueueTransaction( TransactionPair element );
        void EnqueueTransaction( TransactionItem element );

        void SetTimeFrameToleranceMs( uint64_t timeframe_tolerance );
        void SetMutabilityWindowMs( uint64_t mutability_window );

    private:
        static constexpr std::string_view TRANSACTION_BASE_FORMAT = "/bc-%hu/";

        TransactionManager( std::shared_ptr<crdt::GlobalDB>          processing_db,
                            std::shared_ptr<boost::asio::io_context> ctx,
                            UTXOManager                             &utxo_manager,
                            std::shared_ptr<GeniusAccount>           account,
                            std::shared_ptr<crypto::Hasher>          hasher,
                            bool                                     full_node,
                            std::chrono::milliseconds                timestamp_tolerance,
                            std::chrono::milliseconds                mutability_window );

        // Parser function pointer alias: returns a set of topic strings or an error
        using TransactionParserFn =
            outcome::result<void> ( TransactionManager::* )( const std::shared_ptr<IGeniusTransactions> & );

        SGTransaction::DAGStruct                      FillDAGStruct( std::string transaction_hash = "" ) const;
        outcome::result<std::unordered_set<uint64_t>> SendTransactionItem( TransactionItem &item );
        outcome::result<void>                         ConfirmTransactions();
        outcome::result<void>                         RollbackTransactions( TransactionItem &item_to_rollback );

        static std::vector<uint16_t>                                 GetMonitoredNetworkIDs();
        static std::string                                           GetBlockChainBase();
        static outcome::result<std::shared_ptr<IGeniusTransactions>> DeSerializeTransaction( std::string tx_data );

        static outcome::result<std::string> GetExpectedProofKey( const std::string                          &tx_key,
                                                                 const std::shared_ptr<IGeniusTransactions> &tx );
        static outcome::result<std::string> GetExpectedTxKey( const std::string &proof_key );

        outcome::result<bool> CheckProof( const std::shared_ptr<IGeniusTransactions> &tx );
        outcome::result<void> ParseTransaction( const std::shared_ptr<IGeniusTransactions> &tx );
        outcome::result<void> RevertTransaction( const std::shared_ptr<IGeniusTransactions> &tx );

        void InitNonce( uint64_t timeout_ms );
        void SyncNonce();

        void RequestRelevantHeads();

        outcome::result<bool> CheckTransactionValidity( const std::set<uint64_t> &nonces_to_check );

        outcome::result<void> DeleteTransaction( std::string tx_key, const std::unordered_set<std::string> &topics );
        std::shared_ptr<IGeniusTransactions> GetTransactionByHash( const std::string &tx_hash ) const;
        std::shared_ptr<IGeniusTransactions> GetTransactionByHashNoLock( const std::string &tx_hash ) const;
        std::shared_ptr<IGeniusTransactions> GetTransactionByNonceAndAddress( uint64_t           nonce,
                                                                              const std::string &address ) const;

        bool SetOutgoingStatusByNonce( uint64_t nonce, TransactionStatus s );

        std::shared_ptr<crdt::GlobalDB> globaldb_m;

        std::shared_ptr<boost::asio::io_context> ctx_m;
        std::shared_ptr<GeniusAccount>           account_m;
        UTXOManager                             &utxo_manager_;
        std::shared_ptr<crypto::Hasher>          hasher_m;
        bool                                     full_node_m;
        std::string                              full_node_topic_m; 
        void                                     TickOnce();
        State                                    state_m;
        std::mutex                               state_change_callback_mutex_;
        StateChangeCallback                      state_change_callback_;

        // Head request rate limiting (for reactive requests due to nonce gaps)
        std::optional<std::chrono::steady_clock::time_point> last_head_request_time_;

        // Periodic sync - request heads every 10 minutes to stay in sync across devices/instances
        std::chrono::steady_clock::time_point last_periodic_sync_time_;
        std::atomic<bool>                     received_first_periodic_sync_response_{
            false }; // Track if we've gotten at least one response

        static constexpr std::chrono::minutes PERIODIC_SYNC_INTERVAL         = std::chrono::minutes( 10 );
        static constexpr std::chrono::seconds INITIAL_PERIODIC_SYNC_INTERVAL = std::chrono::seconds( 30 );

        // for the SendTransactionItem thread support
        mutable std::mutex          mutex_m;
        std::deque<TransactionItem> tx_queue_m;

        struct TrackedTx
        {
            std::shared_ptr<IGeniusTransactions> tx;
            TransactionStatus                    status;
            uint64_t                             cached_nonce; // Cache nonce to avoid dereferencing tx
        };

        mutable std::shared_mutex                  tx_mutex_m;
        std::unordered_map<std::string, TrackedTx> tx_processed_m;
        std::atomic<size_t>                        verifying_count_{ 0 }; // Count of VERIFYING transactions
        std::function<void()>                      task_m;
        std::atomic<bool>                          stopped_{ false };
        std::chrono::milliseconds                  timestamp_tolerance_m;
        std::chrono::milliseconds                  mutability_window_m;

        static constexpr std::chrono::milliseconds TIMESTAMP_TOLERANCE = std::chrono::seconds( 10 );
        static constexpr std::chrono::milliseconds MUTABILITY_WINDOW   = std::chrono::minutes( 15 );

        std::mutex                                         cv_mutex_;
        std::condition_variable                            cv_;
        std::queue<crdt::CRDTCallbackManager::NewDataPair> new_data_queue_;
        std::queue<std::string>                            deleted_data_queue_;
        std::mutex                                         new_data_queue_mutex_;     // Separate mutex for the queue
        std::mutex                                         deleted_data_queue_mutex_; // Separate mutex for the queue

        std::chrono::steady_clock::time_point last_loop_time_;

        outcome::result<void> ParseTransferTransaction( const std::shared_ptr<IGeniusTransactions> &tx );
        outcome::result<void> ParseMintTransaction( const std::shared_ptr<IGeniusTransactions> &tx );
        outcome::result<void> ParseEscrowTransaction( const std::shared_ptr<IGeniusTransactions> &tx );
        outcome::result<void> ParseEscrowReleaseTransaction( const std::shared_ptr<IGeniusTransactions> &tx );

        outcome::result<void> RevertTransferTransaction( const std::shared_ptr<IGeniusTransactions> &tx );
        outcome::result<void> RevertMintTransaction( const std::shared_ptr<IGeniusTransactions> &tx );
        outcome::result<void> RevertEscrowTransaction( const std::shared_ptr<IGeniusTransactions> &tx );
        outcome::result<void> RevertEscrowReleaseTransaction( const std::shared_ptr<IGeniusTransactions> &tx );

        static inline const std::unordered_map<std::string, std::pair<TransactionParserFn, TransactionParserFn>>
            transaction_parsers = {
                { "transfer",
                  { &TransactionManager::ParseTransferTransaction, &TransactionManager::RevertTransferTransaction } },
                { "mint", { &TransactionManager::ParseMintTransaction, &TransactionManager::RevertMintTransaction } },
                { "escrow-hold",
                  { &TransactionManager::ParseEscrowTransaction, &TransactionManager::RevertEscrowTransaction } },
                { "escrow-release",
                  { &TransactionManager::ParseEscrowReleaseTransaction,
                    &TransactionManager::RevertEscrowReleaseTransaction } } };

        base::Logger m_logger = base::createLogger( "TransactionManager" );

        std::optional<std::vector<crdt::pb::Element>> FilterTransaction( const crdt::pb::Element &element );
        std::optional<std::vector<crdt::pb::Element>> FilterProof( const crdt::pb::Element &element );

        bool ShouldReplaceTransaction( const IGeniusTransactions &existing_tx,
                                       const IGeniusTransactions &new_tx ) const;

        static uint64_t GetCurrentTimestamp();
        int64_t         GetElapsedTime( uint64_t timestamp, uint64_t current_timestamp ) const;
        int64_t         GetElapsedTime( uint64_t timestamp ) const;

        bool IsTransactionImmutable( const IGeniusTransactions &tx ) const;

        outcome::result<void> RemoveTransactionFromProcessedMaps( const std::string &transaction_key,
                                                                  bool               delete_from_crdt = false );
        outcome::result<void> AddTransactionToProcessedMaps( crdt::CRDTCallbackManager::NewDataPair new_data );

        void ProcessDeletion( std::string deleted_key );
        void ProcessNewData( crdt::CRDTCallbackManager::NewDataPair new_data );

        void NewElementCallback( crdt::CRDTCallbackManager::NewDataPair new_data );
        void DeleteElementCallback( std::string deleted_key );

        void ChangeState( State new_state );
    };
}

template <>
struct fmt::formatter<sgns::TransactionManager::State> : formatter<std::string_view>
{
    format_context::iterator format( sgns::TransactionManager::State s, format_context &ctx ) const;
};

#endif

Updated on 2026-03-04 at 13:10:44 -0800