src/account/TransactionManager.cpp¶
Namespaces¶
| Name |
|---|
| sgns |
Detailed Description¶
Date: 2024-04-12 Henrique A. Klein ([email protected])
Source code¶
#include "account/TransactionManager.hpp"
#include <utility>
#include <thread>
#include <boost/asio/post.hpp>
#include <openssl/err.h>
#include <ProofSystem/EthereumKeyPairParams.hpp>
#include "TransferTransaction.hpp"
#include "MintTransaction.hpp"
#include "EscrowTransaction.hpp"
#include "EscrowReleaseTransaction.hpp"
#include "account/TokenAmount.hpp"
#include "account/AccountMessenger.hpp"
#include "account/proto/SGTransaction.pb.h"
#include "crdt/proto/delta.pb.h"
#include "base/sgns_version.hpp"
#include "proof/ProcessingProof.hpp"
namespace sgns
{
std::shared_ptr<TransactionManager> TransactionManager::New( std::shared_ptr<crdt::GlobalDB> processing_db,
std::shared_ptr<boost::asio::io_context> ctx,
UTXOManager &utxo_manager,
std::shared_ptr<GeniusAccount> account,
std::shared_ptr<crypto::Hasher> hasher,
bool full_node,
std::chrono::milliseconds timestamp_tolerance,
std::chrono::milliseconds mutability_window )
{
auto instance = std::shared_ptr<TransactionManager>( new TransactionManager( std::move( processing_db ),
std::move( ctx ),
utxo_manager,
std::move( account ),
std::move( hasher ),
full_node,
timestamp_tolerance,
mutability_window ) );
auto monitored_networks = GetMonitoredNetworkIDs();
for ( auto network_id : monitored_networks )
{
std::string blockchain_base = GetBlockChainBase( network_id );
bool crdt_tx_filter_initialized = instance->globaldb_m->RegisterElementFilter(
"^/?" + blockchain_base + "tx/[^/]+",
[weak_ptr( std::weak_ptr<TransactionManager>( instance ) )](
const crdt::pb::Element &element ) -> std::optional<std::vector<crdt::pb::Element>>
{
if ( auto strong = weak_ptr.lock() )
{
return strong->FilterTransaction( element );
}
return std::nullopt;
} );
bool crdt_proof_filter_initialized = instance->globaldb_m->RegisterElementFilter(
"^/?" + blockchain_base + "proof/[^/]+",
[weak_ptr( std::weak_ptr<TransactionManager>( instance ) )](
const crdt::pb::Element &element ) -> std::optional<std::vector<crdt::pb::Element>>
{
if ( auto strong = weak_ptr.lock() )
{
return strong->FilterProof( element );
}
return std::nullopt;
} );
(void)instance->globaldb_m->RegisterNewElementCallback(
"^/?" + blockchain_base + "tx/[^/]+",
[weak_ptr( std::weak_ptr<TransactionManager>(
instance ) )]( crdt::CRDTCallbackManager::NewDataPair new_data, const std::string &cid )
{
if ( auto strong = weak_ptr.lock() )
{
strong->NewElementCallback( std::move( new_data ) );
}
} );
(void)instance->globaldb_m->RegisterDeletedElementCallback(
"^/?" + blockchain_base + "tx/[^/]+",
[weak_ptr( std::weak_ptr<TransactionManager>( instance ) )]( std::string deleted_key,
const std::string &cid )
{
if ( auto strong = weak_ptr.lock() )
{
strong->DeleteElementCallback( std::move( deleted_key ) );
}
} );
}
return instance;
}
TransactionManager::TransactionManager( std::shared_ptr<crdt::GlobalDB> processing_db,
std::shared_ptr<boost::asio::io_context> ctx,
UTXOManager &utxo_manager,
std::shared_ptr<GeniusAccount> account,
std::shared_ptr<crypto::Hasher> hasher,
bool full_node,
std::chrono::milliseconds timestamp_tolerance,
std::chrono::milliseconds mutability_window ) :
globaldb_m( std::move( processing_db ) ),
ctx_m( std::move( ctx ) ),
account_m( std::move( account ) ),
utxo_manager_( utxo_manager ),
hasher_m( std::move( hasher ) ),
full_node_m( full_node ),
state_m( State::CREATING ),
last_periodic_sync_time_( std::chrono::steady_clock::now() ),
timestamp_tolerance_m( timestamp_tolerance ),
mutability_window_m( mutability_window ),
last_loop_time_( std::chrono::steady_clock::now() )
{
}
TransactionManager::~TransactionManager()
{
m_logger->debug( "[{} - full: {}] ~TransactionManager CALLED",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
Stop();
}
void TransactionManager::Stop()
{
if ( stopped_.exchange( true ) )
{
return; // idempotent
}
// Notify condition variable to wake up waiting thread
cv_.notify_all();
}
void TransactionManager::Start()
{
if ( GetState() != State::CREATING || stopped_.load() )
{
return;
}
m_logger->info( "[{} - full: {}] Initializing values by reading whole blockchain",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
full_node_topic_m = std::string( GNUS_FULL_NODES_TOPIC );
globaldb_m->AddListenTopic( account_m->GetAddress() );
m_logger->info( "[{} - full: {}] Adding broadcast to full node on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
full_node_topic_m );
if ( full_node_m )
{
m_logger->debug( "[{} - full: {}] Listening full node on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
full_node_topic_m );
globaldb_m->AddListenTopic( full_node_topic_m );
}
ChangeState( State::INITIALIZING );
if ( stopped_.load() )
{
return;
}
auto utxo_result = utxo_manager_.LoadUTXOs( globaldb_m->GetDataStore() );
if ( utxo_result.has_error() )
{
m_logger->error( "Failed to load UTXOs from storage" );
}
if ( utxo_result.has_error() || !utxo_result.value() )
{
m_logger->info( "Loading transactions to mount UTXOs" );
QueryTransactions();
}
else
{
auto utxo_map = utxo_manager_.GetAllUTXOs();
auto monitored_networks = GetMonitoredNetworkIDs();
for ( const auto &[address, utxo_data_vector] : utxo_map )
{
m_logger->debug( "[{} - full: {}] Loaded {} UTXOs for address {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
utxo_data_vector.size(),
address.substr( 0, 8 ) );
for ( auto &utxo_data : utxo_data_vector )
{
auto &[utxo_state, utxo] = utxo_data;
m_logger->debug( "[{} - full: {}] UTXO - state: {}, tx_hash: {}, index: {}, amount: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
static_cast<uint8_t>( utxo_state ),
utxo.GetTxID().toReadableString(),
utxo.GetOutputIdx(),
utxo.GetAmount() );
if ( utxo_state != UTXOManager::UTXOState::UTXO_READY )
{
m_logger->debug( "[{} - full: {}] Skipping UTXO in state {} for tx {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
static_cast<uint8_t>( utxo_state ),
utxo.GetTxID().toReadableString() );
continue;
}
for ( auto network_id : monitored_networks )
{
auto tx_path = GetTransactionPath( network_id, utxo.GetTxID().toReadableString() );
auto process_result = FetchAndProcessTransaction( tx_path );
if ( !process_result.has_error() )
{
m_logger->debug( "[{} - full: {}] Processed transaction in {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_path );
break;
}
}
}
}
}
// First kick: keep self alive during the first dispatch only
boost::asio::post( *ctx_m, [self = shared_from_this()]() { self->TickOnce(); } );
}
// One “tick”: do work, then schedule the next tick via weak capture
void TransactionManager::TickOnce()
{
if ( stopped_.load() )
{
return;
}
auto now = std::chrono::steady_clock::now();
auto time_since_last_loop = std::chrono::duration_cast<std::chrono::milliseconds>( now - last_loop_time_ )
.count();
last_loop_time_ = now;
std::vector<std::string> elements_to_delete;
{
std::lock_guard queue_lock( deleted_data_queue_mutex_ );
while ( !deleted_data_queue_.empty() )
{
elements_to_delete.push_back( std::move( deleted_data_queue_.front() ) );
deleted_data_queue_.pop();
}
}
std::vector<crdt::CRDTCallbackManager::NewDataPair> elements_to_process;
{
std::lock_guard queue_lock( new_data_queue_mutex_ );
while ( !new_data_queue_.empty() )
{
elements_to_process.push_back( std::move( new_data_queue_.front() ) );
new_data_queue_.pop();
}
}
for ( auto &deletion_key : elements_to_delete )
{
m_logger->debug( "[{} - full: {}] Deleting key: {} ",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
deletion_key );
ProcessDeletion( deletion_key );
}
for ( auto &new_data : elements_to_process )
{
m_logger->debug( "[{} - full: {}] Adding key: {} ",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
new_data.first );
ProcessNewData( new_data );
}
m_logger->trace( "[{} - full: {}] Loop iteration - time since last: {}ms",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
time_since_last_loop );
switch ( GetState() )
{
case State::INITIALIZING:
InitNonce( 8000 );
if ( GetState() == State::READY )
{
m_logger->debug( "[{} - full: {}] Transaction Manager is now READY - starting regular updates",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
}
break;
case State::CREATING: // Should not happen, but handle gracefully
break;
case State::SYNCING:
this->SyncNonce();
break;
case State::READY:
{
std::unique_lock lock( mutex_m );
if ( tx_queue_m.empty() )
{
break;
}
auto send_result = SendTransactionItem( tx_queue_m.front() );
if ( send_result.has_error() )
{
// Immediately switch to SYNCING so no new transactions are created while we roll back.
ChangeState( State::SYNCING );
m_logger->error( "[{} - full: {}] Error in SendTransactionItem: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
send_result.error().message() );
auto rollback_result = RollbackTransactions( tx_queue_m.front() );
if ( rollback_result.has_error() )
{
m_logger->error( "[{} - full: {}] RollbackTransactions error, couldn't fetch nonce",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
break;
}
// Check if error was due to network timeout - if so, keep transaction in queue for retry
// when full node becomes available
if ( send_result.error() == boost::system::errc::make_error_code( boost::system::errc::timed_out ) )
{
m_logger->info( "[{} - full: {}] Network timeout - keeping transaction in queue for retry",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
// Don't pop - transaction stays in queue for retry when we return to READY
}
else
{
// Other errors (like invalid_argument from nonce mismatch) - remove from queue
tx_queue_m.pop_front();
}
break;
}
auto nonces_sent = send_result.value();
for ( auto nonce : nonces_sent )
{
m_logger->debug( "[{} - full: {}] Confirming local nonce to {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
nonce );
account_m->SetLocalConfirmedNonce( nonce );
}
tx_queue_m.pop_front();
lock.unlock();
}
break;
}
auto confirm_result = ConfirmTransactions();
if ( confirm_result.has_error() )
{
m_logger->trace( "[{} - full: {}] Unknown ConfirmTransactions error",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
}
// Periodic sync - request heads every 10 minutes to stay synchronized across devices/instances
// Use 30 second interval until we get first response, then switch to 10 minutes
bool should_sync = false;
if ( !received_first_periodic_sync_response_.load() )
{
auto time_since_last_sync = std::chrono::duration_cast<std::chrono::seconds>( now -
last_periodic_sync_time_ );
should_sync = time_since_last_sync >= INITIAL_PERIODIC_SYNC_INTERVAL;
}
else
{
auto time_since_last_sync = std::chrono::duration_cast<std::chrono::minutes>( now -
last_periodic_sync_time_ );
should_sync = time_since_last_sync >= PERIODIC_SYNC_INTERVAL;
}
if ( should_sync )
{
auto interval_desc = received_first_periodic_sync_response_.load() ? "10 minutes" : "30 seconds";
m_logger->debug( "[{} - full: {}] Periodic sync - requesting heads (interval: {})",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
interval_desc );
auto topics_result = globaldb_m->GetMonitoredTopics();
if ( topics_result.has_value() )
{
if ( account_m->RequestHeads( topics_result.value() ) )
{
last_periodic_sync_time_ = now;
m_logger->debug( "[{} - full: {}] Periodic sync head request sent for {} topics",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
topics_result.value().size() );
}
else
{
m_logger->warn( "[{} - full: {}] Periodic sync head request failed",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
}
}
else
{
m_logger->warn( "[{} - full: {}] Could not get monitored topics for head request",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
}
}
// Wait with condition variable instead of timer
// Wait with condition variable - wake up on notification OR timeout
std::unique_lock lock( cv_mutex_ );
cv_.wait_for( lock,
std::chrono::milliseconds( 300 ),
[this]
{
bool new_data = false;
bool deleted_data = false;
{
std::lock_guard new_data_queue_lock( new_data_queue_mutex_ );
new_data = !new_data_queue_.empty();
}
{
std::lock_guard delete_data_queue_lock( deleted_data_queue_mutex_ );
deleted_data = !deleted_data_queue_.empty();
}
return stopped_.load() || new_data || deleted_data;
} );
// Schedule next tick if not stopped
if ( !stopped_.load() )
{
boost::asio::post( *ctx_m,
[weak_instance = weak_from_this()]()
{
if ( auto instance = weak_instance.lock() )
{
if ( !instance->stopped_.load() )
{
instance->TickOnce();
}
}
} );
}
}
void TransactionManager::PrintAccountInfo() const
{
std::cout << "Account Address: " << account_m->GetAddress() << std::endl;
std::cout << "Balance: " << std::to_string( utxo_manager_.GetBalance() ) << std::endl;
std::cout << "Token Type: " << account_m->GetToken() << std::endl;
std::cout << "Nonce: " << account_m->GetNonce() << std::endl;
}
const GeniusAccount &TransactionManager::GetAccount() const
{
return *account_m;
}
outcome::result<std::string> TransactionManager::TransferFunds( uint64_t amount,
const std::string &destination,
TokenID token_id )
{
if ( GetState() != State::READY )
{
return outcome::failure( boost::system::error_code{} );
}
OUTCOME_TRY( auto &¶ms, utxo_manager_.CreateTxParameter( amount, destination, token_id ) );
auto [inputs, outputs] = params;
auto transfer_transaction = std::make_shared<TransferTransaction>(
TransferTransaction::New( inputs, outputs, FillDAGStruct() ) );
transfer_transaction->MakeSignature( *account_m );
utxo_manager_.ReserveUTXOs( inputs );
EnqueueTransaction( std::make_pair( transfer_transaction, std::nullopt ) );
return transfer_transaction->GetHash();
}
outcome::result<std::string> TransactionManager::MintFunds( uint64_t amount,
std::string transaction_hash,
std::string chainid,
TokenID tokenid )
{
if ( GetState() != State::READY )
{
return outcome::failure( boost::system::error_code{} );
}
auto mint_transaction = std::make_shared<MintTransaction>(
MintTransaction::New( amount,
std::move( chainid ),
std::move( tokenid ),
FillDAGStruct( std::move( transaction_hash ) ) ) );
mint_transaction->MakeSignature( *account_m );
// Store the transaction ID before moving the transaction
auto txId = mint_transaction->GetHash();
EnqueueTransaction( std::make_pair( std::move( mint_transaction ), std::nullopt ) );
return txId;
}
outcome::result<std::pair<std::string, EscrowDataPair>> TransactionManager::HoldEscrow( uint64_t amount,
const std::string &dev_addr,
uint64_t peers_cut,
const std::string &job_id )
{
if ( GetState() != State::READY )
{
return outcome::failure( boost::system::error_code{} );
}
auto hash_data = hasher_m->blake2b_256( std::vector<uint8_t>{ job_id.begin(), job_id.end() } );
OUTCOME_TRY( ( auto &&, params ),
utxo_manager_.CreateTxParameter( amount,
"0x" + hash_data.toReadableString(),
TokenID::FromBytes( { 0x00 } ) ) );
auto [inputs, outputs] = params;
utxo_manager_.ReserveUTXOs( inputs );
auto escrow_transaction = std::make_shared<EscrowTransaction>(
EscrowTransaction::New( params, amount, dev_addr, peers_cut, FillDAGStruct() ) );
escrow_transaction->MakeSignature( *account_m );
// Get the transaction ID for tracking
auto txId = escrow_transaction->GetHash();
EnqueueTransaction( std::make_pair( escrow_transaction, std::nullopt ) );
crdt::GlobalDB::Buffer data_transaction;
data_transaction.put( escrow_transaction->SerializeByteVector() );
// Return both the transaction ID and the original EscrowDataPair
return std::make_pair( txId,
std::make_pair( "0x" + hash_data.toReadableString(), std::move( data_transaction ) ) );
}
outcome::result<std::string> TransactionManager::PayEscrow(
const std::string &escrow_path,
const SGProcessing::TaskResult &task_result,
std::shared_ptr<crdt::AtomicTransaction> crdt_transaction )
{
if ( task_result.subtask_results().size() == 0 )
{
m_logger->error( "[{} - full: {}] No result found on escrow {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
escrow_path );
return outcome::failure( boost::system::error_code{} );
}
if ( escrow_path.empty() )
{
m_logger->error( "[{} - full: {}] Escrow path empty", account_m->GetAddress().substr( 0, 8 ), full_node_m );
return outcome::failure( boost::system::error_code{} );
}
m_logger->debug( "[{} - full: {}] Fetching escrow from processing DB at {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
escrow_path );
OUTCOME_TRY( ( auto &&, transaction ), FetchTransaction( globaldb_m, escrow_path ) );
std::shared_ptr<EscrowTransaction> escrow_tx = std::dynamic_pointer_cast<EscrowTransaction>( transaction );
std::vector<std::string> subtask_ids;
std::vector<OutputDestInfo> payout_peers;
OUTCOME_TRY( ( auto &&, escrow_amount_ptr ), TokenAmount::New( escrow_tx->GetAmount() ) );
OUTCOME_TRY( ( auto &&, peers_cut_ptr ), TokenAmount::New( escrow_tx->GetPeersCut() ) );
OUTCOME_TRY( ( auto &&, peer_total ), escrow_amount_ptr->Multiply( *peers_cut_ptr ) );
const auto escrowTokenId = escrow_tx->GetUTXOParameters().second[0].token_id;
uint64_t peers_amount = peer_total.Value() / static_cast<uint64_t>( task_result.subtask_results().size() );
auto remainder = escrow_tx->GetAmount();
for ( auto &subtask : task_result.subtask_results() )
{
std::cout << "Subtask Result " << subtask.subtaskid() << "from " << subtask.node_address() << std::endl;
m_logger->debug( "[{} - full: {}] Paying out {} in {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
peers_amount,
subtask.token_id() );
subtask_ids.push_back( subtask.subtaskid() );
payout_peers.push_back( { peers_amount,
subtask.node_address(),
TokenID::FromBytes( subtask.token_id().data(), subtask.token_id().size() ) } );
remainder -= peers_amount;
}
//TODO: see what do with token_id here
m_logger->debug( "[{} - full: {}] Sending to dev {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
remainder );
payout_peers.push_back( { remainder, escrow_tx->GetDevAddress(), escrowTokenId } );
InputUTXOInfo escrow_utxo_input;
escrow_utxo_input.txid_hash_ = base::Hash256::fromReadableString( escrow_tx->GetHash() ).value();
escrow_utxo_input.output_idx_ = 0;
escrow_utxo_input.signature_ = account_m->Sign( escrow_utxo_input.SerializeForSigning() );
auto transfer_transaction = std::make_shared<TransferTransaction>(
TransferTransaction::New( std::vector{ escrow_utxo_input }, payout_peers, FillDAGStruct() ) );
auto escrow_release_tx = std::make_shared<EscrowReleaseTransaction>(
EscrowReleaseTransaction::New( escrow_tx->GetUTXOParameters(),
escrow_tx->GetAmount(),
escrow_tx->GetDevAddress(),
escrow_tx->dag_st.source_addr(),
escrow_tx->GetHash(),
FillDAGStruct() ) );
TransactionBatch tx_batch;
transfer_transaction->MakeSignature( *account_m );
escrow_release_tx->MakeSignature( *account_m );
tx_batch.push_back( std::make_pair( transfer_transaction, std::nullopt ) );
tx_batch.push_back( std::make_pair( escrow_release_tx, std::nullopt ) );
EnqueueTransaction( std::make_pair( tx_batch, std::move( crdt_transaction ) ) );
return transfer_transaction->GetHash();
}
void TransactionManager::EnqueueTransaction( TransactionItem element )
{
m_logger->debug( "[{} - full: {}] Transaction enqueuing", account_m->GetAddress().substr( 0, 8 ), full_node_m );
{
std::unique_lock tx_lock( tx_mutex_m );
for ( auto &&[tx, _] : element.first )
{
const auto key = GetTransactionPath( *tx );
const auto nonce = tx->dag_st.nonce();
// tx visible to status queries immediately
tx_processed_m[key] = TrackedTx{ tx, TransactionStatus::CREATED, nonce };
m_logger->debug( "[{} - full: {}] Setting {} to CREATED",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx->GetHash() );
}
}
std::lock_guard lock( mutex_m );
tx_queue_m.emplace_back( std::move( element ) );
}
void TransactionManager::EnqueueTransaction( TransactionPair element )
{
EnqueueTransaction( { { std::move( element ) }, std::nullopt } );
}
//TODO - Fill hash stuff on DAGStruct
SGTransaction::DAGStruct TransactionManager::FillDAGStruct( std::string transaction_hash ) const
{
SGTransaction::DAGStruct dag;
auto timestamp = std::chrono::system_clock::now();
dag.set_previous_hash( transaction_hash );
dag.set_nonce( account_m->ReserveNextNonce() );
dag.set_source_addr( account_m->GetAddress() );
dag.set_timestamp(
std::chrono::duration_cast<std::chrono::milliseconds>( timestamp.time_since_epoch() ).count() );
dag.set_uncle_hash( "" );
dag.set_data_hash( "" ); //filled by transaction class
return dag;
}
outcome::result<std::unordered_set<uint64_t>> TransactionManager::SendTransactionItem( TransactionItem &item )
{
std::unordered_set<uint64_t> nonces_set;
auto [transaction_batch, maybe_crdt_transaction] = item;
std::shared_ptr<crdt::AtomicTransaction> crdt_transaction = nullptr;
m_logger->trace( "{} called", __func__ );
if ( maybe_crdt_transaction.has_value() && maybe_crdt_transaction.value() )
{
crdt_transaction = std::move( maybe_crdt_transaction.value() );
}
else
{
crdt_transaction = globaldb_m->BeginTransaction();
}
auto nonce_result = account_m->GetConfirmedNonce( NONCE_REQUEST_TIMEOUT_MS );
uint64_t expected_next_nonce = 0;
int64_t confirmed_nonce = -1;
if ( nonce_result.has_value() )
{
confirmed_nonce = static_cast<int64_t>( nonce_result.value() );
m_logger->debug( "[{} - full: {}] Set nonce to {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
confirmed_nonce );
expected_next_nonce = static_cast<uint64_t>( confirmed_nonce ) + 1;
}
else if ( nonce_result.has_error() && nonce_result.error() == AccountMessenger::Error::NO_RESPONSE_RECEIVED )
{
if ( !full_node_m )
{
m_logger->error( "[{} - full: {}] {}: Network unreachable when fetching nonce",
__func__,
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::timed_out ) );
}
m_logger->warn( "[{} - full: {}] Could not fetch nonce, but proceeding since full node",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
if ( auto local_confirmed = account_m->GetLocalConfirmedNonce(); local_confirmed.has_value() )
{
confirmed_nonce = static_cast<int64_t>( local_confirmed.value() );
m_logger->debug( "[{} - full: {}] Using local confirmed nonce {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
local_confirmed.value() );
expected_next_nonce = static_cast<uint64_t>( confirmed_nonce ) + 1;
}
}
for ( auto &[transaction, maybe_proof] : transaction_batch )
{
if ( transaction->dag_st.nonce() != expected_next_nonce )
{
m_logger->error( "[{} - full: {}] Transaction with unexpected nonce - Expected: {}, Tried to send: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
expected_next_nonce,
transaction->dag_st.nonce() );
return outcome::failure(
boost::system::errc::make_error_code( boost::system::errc::invalid_argument ) );
}
auto transaction_path = GetTransactionPath( *transaction );
crdt::HierarchicalKey tx_key( transaction_path );
crdt::GlobalDB::Buffer data_transaction;
m_logger->debug( "[{} - full: {}] Recording the transaction on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key.GetKey() );
data_transaction.put( transaction->SerializeByteVector() );
BOOST_OUTCOME_TRYV2( auto &&, crdt_transaction->Put( std::move( tx_key ), std::move( data_transaction ) ) );
if ( maybe_proof )
{
crdt::HierarchicalKey proof_key( GetTransactionProofPath( *transaction ) );
crdt::GlobalDB::Buffer proof_transaction;
auto &proof = maybe_proof.value();
m_logger->debug( "[{} - full: {}] Recording the proof on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
proof_key.GetKey() );
proof_transaction.put( proof );
BOOST_OUTCOME_TRYV2( auto &&,
crdt_transaction->Put( std::move( proof_key ), std::move( proof_transaction ) ) );
}
nonces_set.insert( transaction->dag_st.nonce() );
expected_next_nonce++;
}
std::unordered_set<std::string> topicSet;
if ( !transaction_batch.empty() )
{
topicSet.emplace( full_node_topic_m );
topicSet.emplace( account_m->GetAddress() );
}
for ( auto &[tx, _] : transaction_batch )
{
OUTCOME_TRY( ParseTransaction( tx ) );
topicSet.merge( tx->GetTopics() );
std::unique_lock tx_lock( tx_mutex_m );
const auto key = GetTransactionPath( *tx );
const auto nonce = tx->dag_st.nonce();
auto it = tx_processed_m.find( key );
auto tx_state = TransactionStatus::VERIFYING;
if ( full_node_m )
{
tx_state = TransactionStatus::CONFIRMED;
}
if ( it != tx_processed_m.end() )
{
if ( it->second.status != tx_state && tx_state == TransactionStatus::VERIFYING )
{
verifying_count_.fetch_add( 1, std::memory_order_relaxed );
}
it->second.status = tx_state;
}
else
{
tx_processed_m[key] = TrackedTx{ tx, tx_state, nonce };
if ( tx_state == TransactionStatus::VERIFYING )
{
verifying_count_.fetch_add( 1, std::memory_order_relaxed );
}
}
}
BOOST_OUTCOME_TRYV2( auto &&, crdt_transaction->Commit( topicSet ) );
return nonces_set;
}
outcome::result<void> TransactionManager::RollbackTransactions( TransactionItem &item_to_rollback )
{
int64_t confirmed_nonce = -1;
if ( auto nonce_result = account_m->GetConfirmedNonce( NONCE_REQUEST_TIMEOUT_MS ); nonce_result.has_value() )
{
confirmed_nonce = static_cast<int64_t>( nonce_result.value() );
m_logger->debug( "[{} - full: {}] Set nonce to {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
confirmed_nonce );
}
else
{
m_logger->error( "[{} - full: {}] {}: Could not fetch confirmed nonce ({}). Attempting rollback with "
"local state",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
__func__,
nonce_result.error().message() );
auto local_nonce_result = account_m->GetLocalConfirmedNonce();
if ( local_nonce_result.has_value() )
{
confirmed_nonce = static_cast<int64_t>( local_nonce_result.value() );
m_logger->debug( "[{} - full: {}] Falling back to local confirmed nonce {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
confirmed_nonce );
}
else
{
m_logger->error( "[{} - full: {}] No local confirmed nonce available, rolling back assuming none",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
confirmed_nonce = -1;
}
}
auto [transaction_batch, _dontcare] = item_to_rollback;
for ( auto &[transaction, __dontcare] : transaction_batch )
{
auto signed_previous_nonce = static_cast<int64_t>( transaction->dag_st.nonce() ) - 1;
for ( auto tx_nonce = signed_previous_nonce; tx_nonce > confirmed_nonce; --tx_nonce )
{
//let's verify if we didn't mistakenly confirm any bad transactions.
m_logger->debug( "[{} - full: {}] Setting \"VERIFYING\" status to transaction with nonce {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_nonce );
(void)SetOutgoingStatusByNonce( static_cast<uint64_t>( tx_nonce ), TransactionStatus::VERIFYING );
}
{
std::unique_lock tx_lock( tx_mutex_m );
const auto key = GetTransactionPath( *transaction );
const auto nonce = transaction->dag_st.nonce();
if ( auto it = tx_processed_m.find( key ); it != tx_processed_m.end() )
{
// Update verifying_count if status is changing from VERIFYING
if ( it->second.status == TransactionStatus::VERIFYING )
{
verifying_count_.fetch_sub( 1, std::memory_order_relaxed );
}
it->second.tx = transaction;
it->second.status = TransactionStatus::FAILED;
it->second.cached_nonce = nonce;
}
else
{
// New entry rolled back: start directly as FAILED
tx_processed_m.emplace( key, TrackedTx{ transaction, TransactionStatus::FAILED, nonce } );
}
}
RemoveTransactionFromProcessedMaps( GetTransactionPath( *transaction ) );
account_m->ReleaseNonce( transaction->dag_st.nonce() );
}
return outcome::success();
}
std::string TransactionManager::GetTransactionPath( uint16_t base, const std::string &tx_hash )
{
return GetBlockChainBase( base ) + IGeniusTransactions::GetTransactionFullPath( tx_hash );
}
std::string TransactionManager::GetTransactionPath( const IGeniusTransactions &element )
{
return GetBlockChainBase() + element.GetTransactionFullPath();
}
std::string TransactionManager::GetTransactionPath( const std::string &tx_hash )
{
return GetBlockChainBase() + IGeniusTransactions::GetTransactionFullPath( tx_hash );
}
std::string TransactionManager::GetTransactionProofPath( const IGeniusTransactions &element )
{
auto proof_path = GetBlockChainBase() + element.GetProofFullPath();
return proof_path;
}
std::vector<uint16_t> TransactionManager::GetMonitoredNetworkIDs()
{
std::vector monitored_networks{ version::GetNetworkID() };
if ( version::GetNetworkID() == version::DEV_NET_ID ) // DEV network
{
monitored_networks.push_back( version::TEST_NET_ID );
monitored_networks.push_back( version::MAIN_NET_ID );
}
return monitored_networks;
}
std::string TransactionManager::GetBlockChainBase( uint16_t network_id )
{
boost::format tx_key{ std::string( TRANSACTION_BASE_FORMAT ) };
tx_key % network_id;
return tx_key.str();
}
std::string TransactionManager::GetBlockChainBase()
{
return GetBlockChainBase( version::GetNetworkID() );
}
outcome::result<std::string> TransactionManager::GetExpectedProofKey(
const std::string &tx_key,
const std::shared_ptr<IGeniusTransactions> &tx )
{
if ( tx )
{
return GetTransactionProofPath( *tx );
}
const auto tx_pos = tx_key.find( "/tx/" );
if ( tx_pos == std::string::npos )
{
return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::invalid_argument ) );
}
std::string proof_key = tx_key;
proof_key.replace( tx_pos, 4, "/proof/" );
if ( proof_key.size() <= tx_pos + 7 )
{
return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::invalid_argument ) );
}
return proof_key;
}
outcome::result<std::string> TransactionManager::GetExpectedTxKey( const std::string &proof_key )
{
const auto proof_pos = proof_key.find( "/proof/" );
if ( proof_pos == std::string::npos )
{
return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::invalid_argument ) );
}
std::string tx_key = proof_key;
tx_key.replace( proof_pos, 7, "/tx/" );
if ( tx_key.size() <= proof_pos + 4 )
{
return outcome::failure( boost::system::errc::make_error_code( boost::system::errc::invalid_argument ) );
}
return tx_key;
}
outcome::result<std::shared_ptr<IGeniusTransactions>> TransactionManager::DeSerializeTransaction(
std::string tx_data )
{
OUTCOME_TRY( ( auto &&, dag ), IGeniusTransactions::DeSerializeDAGStruct( tx_data ) );
auto it = IGeniusTransactions::GetDeSerializers().find( dag.type() );
if ( it == IGeniusTransactions::GetDeSerializers().end() )
{
return std::errc::invalid_argument;
}
return it->second( std::vector<uint8_t>( tx_data.begin(), tx_data.end() ) );
}
outcome::result<void> TransactionManager::ParseTransaction( const std::shared_ptr<IGeniusTransactions> &tx )
{
auto it = transaction_parsers.find( tx->GetType() );
if ( it == transaction_parsers.end() )
{
m_logger->info( "[{} - full: {}] No Parser Available",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return std::errc::invalid_argument;
}
return ( this->*it->second.first )( tx );
}
outcome::result<void> TransactionManager::RevertTransaction( const std::shared_ptr<IGeniusTransactions> &tx )
{
auto it = transaction_parsers.find( tx->GetType() );
if ( it == transaction_parsers.end() )
{
m_logger->info( "[{} - full: {}] No Reverter Available",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return std::errc::invalid_argument;
}
return ( this->*( it->second.second ) )( tx );
}
outcome::result<std::shared_ptr<IGeniusTransactions>> TransactionManager::FetchTransaction(
const std::shared_ptr<crdt::GlobalDB> &db,
std::string_view transaction_key )
{
OUTCOME_TRY( auto transaction_data, db->Get( { std::string( transaction_key ) } ) );
return DeSerializeTransaction( transaction_data );
}
outcome::result<std::shared_ptr<IGeniusTransactions>> TransactionManager::DeSerializeTransaction(
const base::Buffer &tx_data )
{
const auto &transaction_data_vector = tx_data.toVector();
OUTCOME_TRY( ( auto &&, dag ), IGeniusTransactions::DeSerializeDAGStruct( transaction_data_vector ) );
auto it = IGeniusTransactions::GetDeSerializers().find( dag.type() );
if ( it == IGeniusTransactions::GetDeSerializers().end() )
{
return std::errc::invalid_argument;
}
return it->second( transaction_data_vector );
}
outcome::result<bool> TransactionManager::CheckProof( const std::shared_ptr<IGeniusTransactions> &tx )
{
auto proof_path = GetTransactionProofPath( *tx );
m_logger->debug( "[{} - full: {}] Checking the proof in {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
proof_path );
OUTCOME_TRY( ( auto &&, proof_data ), globaldb_m->Get( { proof_path } ) );
auto proof_data_vector = proof_data.toVector();
m_logger->debug( "[{} - full: {}] Proof data acquired. Verifying...",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return IBasicProof::VerifyFullProof( proof_data_vector );
}
outcome::result<void> TransactionManager::QueryTransactions()
{
auto monitored_networks = GetMonitoredNetworkIDs();
for ( auto network_id : monitored_networks )
{
std::string blockchain_base = GetBlockChainBase( network_id );
std::string query_path = blockchain_base + "tx";
m_logger->trace( "[{} - full: {}] Probing transactions on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
query_path );
OUTCOME_TRY( auto transaction_list, globaldb_m->QueryKeyValues( query_path ) );
m_logger->trace( "[{} - full: {}] Transaction list grabbed from CRDT with Size {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
transaction_list.size() );
for ( const auto &[key, value] : transaction_list )
{
auto transaction_key = globaldb_m->KeyToString( key );
if ( !transaction_key.has_value() )
{
m_logger->error( "[{} - full: {}] Unable to convert a key to string",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
continue;
}
auto process_result = FetchAndProcessTransaction( transaction_key.value(), value );
if ( !transaction_key.has_value() )
{
m_logger->error( "[{} - full: {}] Unable to fetch and process transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
transaction_key.value() );
}
}
}
return outcome::success();
}
outcome::result<void> TransactionManager::FetchAndProcessTransaction( const std::string &tx_key,
std::optional<base::Buffer> tx_data )
{
{
std::shared_lock tx_lock( tx_mutex_m );
if ( tx_processed_m.find( tx_key ) != tx_processed_m.end() )
{
m_logger->trace( "[{} - full: {}] Transaction already processed: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
return outcome::success();
}
}
auto transaction_result = [&]()
{
if ( tx_data.has_value() )
{
m_logger->debug( "[{} - full: {}] Deserializing transaction: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
return DeSerializeTransaction( tx_data.value() );
}
m_logger->debug( "[{} - full: {}] Finding transaction: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
return FetchTransaction( globaldb_m, tx_key );
}();
if ( transaction_result.has_error() )
{
m_logger->debug( "[{} - full: {}] Can't fetch transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
return outcome::failure( transaction_result.error() );
}
auto &transaction = transaction_result.value();
if ( transaction->GetHash().empty() )
{
m_logger->error( "[{} - full: {}] Error, received transaction without hash: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
return outcome::failure( std::errc::invalid_argument );
}
auto maybe_parsed = ParseTransaction( transaction );
if ( maybe_parsed.has_error() )
{
m_logger->debug( "[{} - full: {}] Can't parse the transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
return outcome::failure( maybe_parsed.error() );
}
const auto nonce = transaction->dag_st.nonce();
account_m->SetPeerConfirmedNonce( nonce, transaction->dag_st.source_addr() );
{
std::unique_lock tx_lock( tx_mutex_m );
tx_processed_m[tx_key] = TrackedTx{ transaction, TransactionStatus::CONFIRMED, nonce };
}
return outcome::success();
}
outcome::result<void> TransactionManager::ParseTransferTransaction( const std::shared_ptr<IGeniusTransactions> &tx )
{
auto transfer_tx = std::dynamic_pointer_cast<TransferTransaction>( tx );
auto dest_infos = transfer_tx->GetDstInfos();
for ( std::uint32_t i = 0; i < dest_infos.size(); ++i )
{
auto hash = ( base::Hash256::fromReadableString( transfer_tx->GetHash() ) ).value();
GeniusUTXO new_utxo( hash, i, dest_infos[i].encrypted_amount, dest_infos[i].token_id );
utxo_manager_.PutUTXO( new_utxo, dest_infos[i].dest_address );
m_logger->debug( "[{} - full: {}] Notify {} of transfer of {} to it",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
dest_infos[i].dest_address,
dest_infos[i].encrypted_amount );
}
for ( auto &input : transfer_tx->GetInputInfos() )
{
m_logger->trace( "[{} - full: {}] UTXO to be updated {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
input.txid_hash_.toReadableString() );
m_logger->trace( "[{} - full: {}] UTXO output {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
input.output_idx_ );
}
utxo_manager_.ConsumeUTXOs( transfer_tx->GetInputInfos(), transfer_tx->GetSrcAddress() );
return outcome::success();
}
outcome::result<void> TransactionManager::ParseMintTransaction( const std::shared_ptr<IGeniusTransactions> &tx )
{
auto mint_tx = std::dynamic_pointer_cast<MintTransaction>( tx );
auto hash = ( base::Hash256::fromReadableString( mint_tx->GetHash() ) ).value();
GeniusUTXO new_utxo( hash, 0, mint_tx->GetAmount(), mint_tx->GetTokenID() );
utxo_manager_.PutUTXO( new_utxo, mint_tx->GetSrcAddress() );
m_logger->info( "[{} - full: {}] Created tokens, amount {} balance {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
std::to_string( mint_tx->GetAmount() ),
std::to_string( utxo_manager_.GetBalance() ) );
return outcome::success();
}
outcome::result<void> TransactionManager::ParseEscrowTransaction( const std::shared_ptr<IGeniusTransactions> &tx )
{
auto escrow_tx = std::dynamic_pointer_cast<EscrowTransaction>( tx );
if ( escrow_tx->GetSrcAddress() == account_m->GetAddress() )
{
auto [_, outputs] = escrow_tx->GetUTXOParameters();
if ( !outputs.empty() )
{
//The first is the escrow, second is the change (might not happen)
auto hash = ( base::Hash256::fromReadableString( escrow_tx->GetHash() ) ).value();
if ( outputs.size() > 1 )
{
GeniusUTXO new_utxo( hash, 1, outputs[1].encrypted_amount, outputs[1].token_id );
utxo_manager_.PutUTXO( new_utxo, outputs[1].dest_address );
}
utxo_manager_.ConsumeUTXOs( escrow_tx->GetUTXOParameters().first, escrow_tx->GetSrcAddress() );
}
}
return outcome::success();
}
outcome::result<void> TransactionManager::ParseEscrowReleaseTransaction(
const std::shared_ptr<IGeniusTransactions> &tx )
{
auto escrowReleaseTx = std::dynamic_pointer_cast<EscrowReleaseTransaction>( tx );
if ( !escrowReleaseTx )
{
m_logger->error( "[{} - full: {}] Failed to cast transaction to EscrowReleaseTransaction",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return std::errc::invalid_argument;
}
std::string originalEscrowHash = escrowReleaseTx->GetOriginalEscrowHash();
m_logger->debug( "[{} - full: {}] Successfully fetched release for escrow: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
originalEscrowHash );
return outcome::success();
}
outcome::result<void> TransactionManager::RevertTransferTransaction(
const std::shared_ptr<IGeniusTransactions> &tx )
{
auto transfer_tx = std::dynamic_pointer_cast<TransferTransaction>( tx );
auto dest_infos = transfer_tx->GetDstInfos();
for ( const auto &dest_info : dest_infos )
{
auto hash = ( base::Hash256::fromReadableString( transfer_tx->GetHash() ) ).value();
utxo_manager_.DeleteUTXO( hash, dest_info.dest_address );
m_logger->debug( "[{} - full: {}] Notify {} of deletion of {} to it",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
dest_info.dest_address,
dest_info.encrypted_amount );
}
m_logger->debug( "[{} - full: {}] Adding origin address to Broadcast: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
transfer_tx->GetSrcAddress() );
m_logger->debug( "[{} - full: {}] Re-parsing inputs to be added as UTXOs",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
for ( const auto &input : transfer_tx->GetInputInfos() )
{
m_logger->debug( "[{} - full: {}] Fetching transaction {} ",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
input.txid_hash_.toReadableString() );
auto tx = GetTransactionByHashNoLock( input.txid_hash_.toReadableString() );
if ( tx )
{
m_logger->debug( "[{} - full: {}] Re-parsing {} transaction",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx->GetType() );
OUTCOME_TRY( ParseTransaction( tx ) );
}
}
utxo_manager_.RollbackUTXOs( transfer_tx->GetInputInfos() );
return outcome::success();
}
outcome::result<void> TransactionManager::RevertMintTransaction( const std::shared_ptr<IGeniusTransactions> &tx )
{
auto mint_tx = std::dynamic_pointer_cast<MintTransaction>( tx );
auto hash = ( base::Hash256::fromReadableString( mint_tx->GetHash() ) ).value();
utxo_manager_.DeleteUTXO( hash, mint_tx->GetSrcAddress() );
m_logger->info( "[{} - full: {}] Deleted {} tokens, from tx {}, final balance {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
mint_tx->GetAmount(),
mint_tx->GetHash(),
std::to_string( utxo_manager_.GetBalance() ) );
return outcome::success();
}
outcome::result<void> TransactionManager::RevertEscrowTransaction( const std::shared_ptr<IGeniusTransactions> &tx )
{
auto escrow_tx = std::dynamic_pointer_cast<EscrowTransaction>( tx );
if ( escrow_tx->GetSrcAddress() == account_m->GetAddress() )
{
if ( auto [inputs, outputs] = escrow_tx->GetUTXOParameters(); !outputs.empty() )
{
//The first is the escrow, second is the change (might not happen)
auto hash = ( base::Hash256::fromReadableString( escrow_tx->GetHash() ) ).value();
if ( outputs.size() > 1 )
{
utxo_manager_.DeleteUTXO( hash, outputs[1].dest_address );
}
for ( auto &input : inputs )
{
auto tx = GetTransactionByHashNoLock( input.txid_hash_.toReadableString() );
if ( tx )
{
m_logger->debug( "[{} - full: {}] Re-parsing {} transaction",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx->GetType() );
OUTCOME_TRY( ParseTransaction( tx ) );
}
}
utxo_manager_.RollbackUTXOs( inputs );
}
}
return outcome::success();
}
outcome::result<void> TransactionManager::RevertEscrowReleaseTransaction(
const std::shared_ptr<IGeniusTransactions> &tx )
{
auto escrowReleaseTx = std::dynamic_pointer_cast<EscrowReleaseTransaction>( tx );
if ( !escrowReleaseTx )
{
m_logger->error( "[{} - full: {}] Failed to cast transaction to EscrowReleaseTransaction",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return std::errc::invalid_argument;
}
std::string originalEscrowHash = escrowReleaseTx->GetOriginalEscrowHash();
m_logger->debug( "[{} - full: {}] Successfully fetched release for escrow: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
originalEscrowHash );
return outcome::success();
}
std::vector<std::vector<uint8_t>> TransactionManager::GetOutTransactions() const
{
std::vector<std::vector<std::uint8_t>> result;
{
std::shared_lock tx_lock( tx_mutex_m );
result.reserve( tx_processed_m.size() );
for ( const auto &[key, value] : tx_processed_m )
{
if ( value.tx && value.tx->GetSrcAddress() == account_m->GetAddress() )
{
result.push_back( value.tx->SerializeByteVector() );
}
}
}
return result;
}
std::vector<std::vector<uint8_t>> TransactionManager::GetInTransactions() const
{
std::vector<std::vector<std::uint8_t>> result;
{
std::shared_lock<std::shared_mutex> tx_lock( tx_mutex_m );
result.reserve( tx_processed_m.size() );
for ( const auto &[key, value] : tx_processed_m )
{
if ( value.tx && value.tx->GetSrcAddress() != account_m->GetAddress() )
{
result.push_back( value.tx->SerializeByteVector() );
}
}
}
return result;
}
TransactionManager::TransactionStatus TransactionManager::WaitForTransactionIncoming(
const std::string &txId,
std::chrono::milliseconds timeout ) const
{
auto start = std::chrono::steady_clock::now();
auto retval = TransactionStatus::FAILED;
do
{
std::shared_lock tx_lock( tx_mutex_m );
for ( const auto &[_, tracked] : tx_processed_m )
{
if ( tracked.tx && tracked.tx->GetHash() == txId &&
tracked.tx->GetSrcAddress() != account_m->GetAddress() )
{
retval = tracked.status;
break;
}
}
if ( retval == TransactionStatus::CONFIRMED )
{
m_logger->debug( "[{} - full: {}] Transaction is FINALIZED",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
break;
}
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
} while ( std::chrono::steady_clock::now() - start < timeout );
return retval;
}
TransactionManager::TransactionStatus TransactionManager::WaitForTransactionOutgoing(
const std::string &txId,
std::chrono::milliseconds timeout ) const
{
auto start = std::chrono::steady_clock::now();
auto retval = TransactionStatus::CREATED;
do
{
std::shared_lock<std::shared_mutex> tx_lock( tx_mutex_m );
m_logger->trace( "[{} - full: {}] Searching for transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
txId );
bool found = false;
for ( const auto &[_, tracked] : tx_processed_m )
{
if ( tracked.tx && tracked.tx->GetHash() == txId &&
tracked.tx->GetSrcAddress() == account_m->GetAddress() )
{
retval = tracked.status;
m_logger->trace( "[{} - full: {}] Transaction status is {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
static_cast<int>( retval ) );
found = true;
break;
}
}
if ( !found )
{
m_logger->trace( "[{} - full: {}] Transaction untracked",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
retval = TransactionStatus::FAILED;
}
if ( retval == TransactionStatus::INVALID || retval == TransactionStatus::CONFIRMED ||
retval == TransactionStatus::FAILED )
{
m_logger->trace( "[{} - full: {}] Transaction has finalized state {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
static_cast<int>( retval ) );
break;
}
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
} while ( std::chrono::steady_clock::now() - start < timeout );
return retval;
}
TransactionManager::TransactionStatus TransactionManager::WaitForEscrowRelease(
const std::string &originalEscrowId,
std::chrono::milliseconds timeout ) const
{
auto start = std::chrono::steady_clock::now();
auto retval = TransactionStatus::INVALID;
while ( std::chrono::steady_clock::now() - start < timeout )
{
std::shared_lock<std::shared_mutex> tx_lock( tx_mutex_m );
for ( const auto &[_, tracked] : tx_processed_m )
{
if ( !tracked.tx )
{
continue;
}
if ( tracked.tx->GetType() == "escrow-release" )
{
auto escrowReleaseTx = std::dynamic_pointer_cast<EscrowReleaseTransaction>( tracked.tx );
if ( escrowReleaseTx && escrowReleaseTx->GetOriginalEscrowHash() == originalEscrowId )
{
m_logger->debug( "[{} - full: {}] Found matching escrow release transaction with tx id: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tracked.tx->GetHash() );
retval = tracked.status;
// If finalized, return immediately; otherwise keep waiting.
if ( retval == TransactionStatus::CONFIRMED || retval == TransactionStatus::FAILED ||
retval == TransactionStatus::INVALID )
{
return retval;
}
}
}
}
std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) );
}
return retval; // Will be INVALID if not seen within timeout
}
void TransactionManager::InitNonce( uint64_t timeout_ms )
{
m_logger->debug( "[{} - full: {}] Trying to get confirmed nonce",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
auto nonce_result = account_m->GetConfirmedNonce( NONCE_REQUEST_TIMEOUT_MS );
if ( nonce_result.has_value() )
{
auto network_confirmed_nonce = nonce_result.value();
m_logger->debug( "[{} - full: {}] Nonce from the network received {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
network_confirmed_nonce );
bool synced = true;
for ( uint64_t i = 1; i <= network_confirmed_nonce; ++i )
{
if ( auto tx = GetTransactionByNonceAndAddress( i, account_m->GetAddress() ); !tx )
{
synced = false;
m_logger->debug( "[{} - full: {}] Missing transaction with nonce {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
i );
break;
}
}
if ( synced )
{
ChangeState( State::READY );
}
else
{
// We're missing transactions - request heads to help sync faster
uint64_t gap = network_confirmed_nonce - account_m->GetProposedNonce() + 1;
m_logger->info( "[{} - full: {}] Missing transactions during init, gap: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
gap );
RequestRelevantHeads();
}
}
else
{
//TODO - Have the full node respond it doesn't have it to check for connectivity
m_logger->debug( "[{} - full: {}] No node from the network, assume we are updated",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
ChangeState( State::READY );
}
}
void TransactionManager::SyncNonce()
{
m_logger->debug( "[{} - full: {}] Checking if my nonce is updated",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
auto nonce_result = account_m->GetConfirmedNonce( NONCE_REQUEST_TIMEOUT_MS );
uint64_t confirmed_nonce = 0;
if ( nonce_result.has_value() )
{
confirmed_nonce = nonce_result.value();
}
else
{
auto local_nonce_result = account_m->GetLocalConfirmedNonce();
if ( local_nonce_result.has_value() )
{
confirmed_nonce = local_nonce_result.value();
}
else
{
return;
}
}
uint64_t expected_next_nonce = confirmed_nonce + 1;
uint64_t proposed_nonce = account_m->GetProposedNonce();
if ( proposed_nonce == expected_next_nonce )
{
//Either my old txs are outdated or
//The responder has not updated yet
m_logger->debug( "[{} - full: {}] Network nonce updated: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
expected_next_nonce );
ChangeState( State::READY );
}
else if ( proposed_nonce > expected_next_nonce )
{
m_logger->error( "[{} - full: {}] Local nonce ahead - Local: {}, Expected: {}. Checking for invalid tx",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
proposed_nonce,
expected_next_nonce );
std::set<uint64_t> nonces_to_check;
for ( auto i = expected_next_nonce; i < proposed_nonce; ++i )
{
nonces_to_check.insert( i );
m_logger->debug( "[{} - full: {}] Inserting nonce to check: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
i );
}
(void)CheckTransactionValidity( nonces_to_check );
}
else if ( proposed_nonce < expected_next_nonce )
{
uint64_t nonce_gap = expected_next_nonce - proposed_nonce;
m_logger->error( "[{} - full: {}] Local nonce behind - Local: {}, Expected: {}. Gap: {}. Waiting to sync",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
proposed_nonce,
expected_next_nonce,
nonce_gap );
// If we're behind at all, we need to catch up - even a gap of 1 means
// there's transaction data in CRDT that we don't have, and we cannot
// safely propose new transactions until we're caught up
constexpr uint64_t SIGNIFICANT_GAP_THRESHOLD = 1;
if ( nonce_gap >= SIGNIFICANT_GAP_THRESHOLD )
{
RequestRelevantHeads();
}
}
}
void TransactionManager::RequestRelevantHeads()
{
// Rate limiting: don't request more than once per 30 seconds
auto now = std::chrono::steady_clock::now();
if ( last_head_request_time_.has_value() )
{
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>( now - last_head_request_time_.value() );
if ( elapsed.count() < 30 )
{
m_logger->trace( "[{} - full: {}] Skipping head request - too soon since last request ({}s ago)",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
elapsed.count() );
return;
}
}
auto topics_result = globaldb_m->GetMonitoredTopics();
if ( !topics_result.has_value() )
{
m_logger->warn( "[{} - full: {}] Could not get monitored topics for head request",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return;
}
m_logger->info( "[{} - full: {}] Requesting heads for {} topics",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
topics_result.value().size() );
if ( account_m->RequestHeads( topics_result.value() ) )
{
last_head_request_time_ = now;
m_logger->debug( "[{} - full: {}] Periodic sync head request sent for {} topics",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
topics_result.value().size() );
}
else
{
m_logger->warn( "[{} - full: {}] Failed to request heads",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
}
}
outcome::result<bool> TransactionManager::CheckTransactionValidity( const std::set<uint64_t> &nonces_to_check )
{
bool changed = false;
std::vector<std::string> invalid_transaction_keys;
{
std::unique_lock<std::shared_mutex> tx_lock( tx_mutex_m );
m_logger->debug( "[{} - full: {}] {}: Checking transactions",
__func__,
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
for ( auto &nonce : nonces_to_check )
{
for ( auto [key, tracked] : tx_processed_m )
{
if ( !tracked.tx || tracked.tx->GetSrcAddress() != account_m->GetAddress() )
{
continue;
}
m_logger->debug( "[{} - full: {}] {}: Seeing if transaction {} is valid {}",
__func__,
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tracked.tx->dag_st.nonce(),
nonce );
if ( tracked.tx->dag_st.nonce() == nonce )
{
bool valid_tx = true;
if ( !tracked.tx->CheckSignature() )
{
if ( !tracked.tx->CheckDAGSignatureLegacy() )
{
m_logger->error(
"[{} - full: {}] Could not validate signature of transaction with nonce {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
nonce );
valid_tx = false;
}
else
{
m_logger->debug( "[{} - full: {}] Legacy transaction validated with nonce: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
nonce );
}
}
else
{
m_logger->debug( "[{} - full: {}] {}: Transaction is valid with {}",
__func__,
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
nonce );
}
if ( !valid_tx )
{
// Collect the key for later removal
invalid_transaction_keys.push_back( key );
changed = true;
m_logger->debug( "[{} - full: {}] {}: INVALID TX {}",
__func__,
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
nonce );
}
else
{
tracked.status = TransactionStatus::CONFIRMED;
}
}
}
}
}
for ( auto it = invalid_transaction_keys.rbegin(); it != invalid_transaction_keys.rend(); ++it )
{
RemoveTransactionFromProcessedMaps( *it, true );
}
return changed;
}
outcome::result<void> TransactionManager::DeleteTransaction( std::string tx_key,
const std::unordered_set<std::string> &topics )
{
std::shared_ptr<crdt::AtomicTransaction> crdt_transaction = globaldb_m->BeginTransaction();
m_logger->debug( "[{} - full: {}] Deleting transaction on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
OUTCOME_TRY( crdt_transaction->Remove( { std::move( tx_key ) } ) );
m_logger->debug( "[{} - full: {}] Removed key transaction on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
OUTCOME_TRY( crdt_transaction->Commit( topics ) );
m_logger->debug( "[{} - full: {}] Commited tx on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_key );
return outcome::success();
}
std::shared_ptr<IGeniusTransactions> TransactionManager::GetTransactionByHash( const std::string &tx_hash ) const
{
std::shared_lock<std::shared_mutex> tx_lock( tx_mutex_m );
return GetTransactionByHashNoLock( tx_hash );
}
std::shared_ptr<IGeniusTransactions> TransactionManager::GetTransactionByHashNoLock(
const std::string &tx_hash ) const
{
for ( const auto &[_, tracked] : tx_processed_m )
{
m_logger->debug( "[{} - full: {}] Searching for hash {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tx_hash );
if ( tracked.tx && tracked.tx->GetHash() == tx_hash &&
tracked.tx->GetSrcAddress() == account_m->GetAddress() )
{
return tracked.tx;
}
}
return nullptr;
}
std::shared_ptr<IGeniusTransactions> TransactionManager::GetTransactionByNonceAndAddress(
uint64_t nonce,
const std::string &address ) const
{
std::shared_lock<std::shared_mutex> tx_lock( tx_mutex_m );
for ( const auto &[_, tracked] : tx_processed_m )
{
if ( tracked.tx && ( tracked.tx->dag_st.nonce() == nonce ) && ( tracked.tx->GetSrcAddress() == address ) )
{
return tracked.tx;
}
}
return nullptr;
}
TransactionManager::TransactionStatus TransactionManager::GetOutgoingStatusByTxId( const std::string &txId ) const
{
std::shared_lock<std::shared_mutex> tx_lock( tx_mutex_m );
for ( const auto &[_, tracked] : tx_processed_m )
{
if ( tracked.tx && tracked.tx->GetHash() == txId && tracked.tx->GetSrcAddress() == account_m->GetAddress() )
{
return tracked.status;
}
}
return TransactionStatus::INVALID;
}
TransactionManager::TransactionStatus TransactionManager::GetIncomingStatusByTxId( const std::string &txId ) const
{
std::shared_lock<std::shared_mutex> tx_lock( tx_mutex_m );
for ( const auto &[_, tracked] : tx_processed_m )
{
if ( tracked.tx && tracked.tx->GetHash() == txId && tracked.tx->GetSrcAddress() != account_m->GetAddress() )
{
return tracked.status;
}
}
return TransactionStatus::INVALID;
}
bool TransactionManager::SetOutgoingStatusByNonce( uint64_t nonce, TransactionStatus s )
{
std::unique_lock<std::shared_mutex> tx_lock( tx_mutex_m );
for ( auto &[_, tracked] : tx_processed_m )
{
if ( !tracked.tx )
{
continue;
}
if ( tracked.tx->GetSrcAddress() != account_m->GetAddress() )
{
continue;
}
if ( tracked.tx->dag_st.nonce() != nonce )
{
continue;
}
auto old_status = tracked.status;
tracked.status = s;
// Update verifying_count
if ( old_status == TransactionStatus::VERIFYING && s != TransactionStatus::VERIFYING )
{
verifying_count_.fetch_sub( 1, std::memory_order_relaxed );
}
else if ( old_status != TransactionStatus::VERIFYING && s == TransactionStatus::VERIFYING )
{
verifying_count_.fetch_add( 1, std::memory_order_relaxed );
}
m_logger->debug( "[{} - full: {}] Set tx {} (nonce {}) to {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
tracked.tx->GetHash(),
nonce,
static_cast<int>( s ) );
return true;
}
m_logger->debug( "[{} - full: {}] No outgoing tx found with nonce {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
nonce );
return false;
}
outcome::result<void> TransactionManager::ConfirmTransactions()
{
// Fast path: check if there are any VERIFYING transactions
if ( verifying_count_.load( std::memory_order_relaxed ) == 0 )
{
m_logger->trace( "[{} - full: {}] No VERIFYING transactions, skipping nonce check in ConfirmTransactions",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return outcome::success();
}
// Collect nonces of VERIFYING transactions using index
std::vector<uint64_t> verifying_nonces;
{
std::shared_lock tx_lock( tx_mutex_m );
for ( const auto &[_, tracked] : tx_processed_m )
{
if ( !tracked.tx )
{
continue;
}
if ( tracked.tx->GetSrcAddress() != account_m->GetAddress() )
{
continue;
}
if ( tracked.status == TransactionStatus::VERIFYING )
{
verifying_nonces.push_back( tracked.tx->dag_st.nonce() );
}
}
}
// If nothing to confirm after lock, skip
if ( verifying_nonces.empty() )
{
m_logger->trace( "[{} - full: {}] No VERIFYING transactions after lock check",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return outcome::success();
}
// Fetch confirmed nonce only if we have VERIFYING transactions
auto nonce_result = account_m->GetConfirmedNonce( NONCE_REQUEST_TIMEOUT_MS );
if ( !nonce_result.has_value() )
{
m_logger->debug( "[{} - full: {}] Can't fetch nonce from the network in ConfirmTransactions",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return outcome::failure( boost::system::error_code{} );
}
uint64_t confirmed_nonce = nonce_result.value();
m_logger->debug( "[{} - full: {}] Confirmed nonce from network: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
confirmed_nonce );
// Use nonce index for O(1) lookup and update
{
std::unique_lock<std::shared_mutex> tx_lock( tx_mutex_m );
for ( uint64_t nonce : verifying_nonces )
{
if ( nonce <= confirmed_nonce )
{
for ( auto &[key, tracked] : tx_processed_m )
{
if ( !tracked.tx )
{
continue;
}
if ( tracked.tx->GetSrcAddress() != account_m->GetAddress() )
{
continue;
}
if ( tracked.tx->dag_st.nonce() != nonce )
{
continue;
}
if ( tracked.status == TransactionStatus::VERIFYING )
{
tracked.status = TransactionStatus::CONFIRMED;
verifying_count_.fetch_sub( 1, std::memory_order_relaxed );
m_logger->debug( "[{} - full: {}] Transaction {} (nonce {}) set to CONFIRMED",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key,
nonce );
}
}
}
}
}
return outcome::success();
}
std::optional<std::vector<crdt::pb::Element>> TransactionManager::FilterTransaction(
const crdt::pb::Element &element )
{
std::optional<std::vector<crdt::pb::Element>> maybe_tombstones;
bool should_delete = false;
std::shared_ptr<IGeniusTransactions> new_tx;
do
{
auto maybe_new_tx = DeSerializeTransaction( element.value() );
if ( maybe_new_tx.has_error() )
{
m_logger->error( "[{} - full: {}] Failed to deserialize incoming transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
should_delete = true;
break;
}
new_tx = maybe_new_tx.value();
if ( !new_tx->CheckSignature() )
{
if ( !new_tx->CheckDAGSignatureLegacy() )
{
m_logger->error( "[{} - full: {}] Could not validate signature of transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
should_delete = true;
break;
}
m_logger->debug( "[{} - full: {}] Legacy transaction validated: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
}
std::shared_ptr<IGeniusTransactions> conflicting_tx;
auto conflicting_tx_res = GetConflictingTransaction( *new_tx );
if ( !conflicting_tx_res.has_value() )
{
//maybe it's not been processed yet, but it's on CRDT
auto maybe_existing_value = globaldb_m->Get( element.key() );
if ( !maybe_existing_value.has_value() )
{
m_logger->trace( "[{} - full: {}] No existing transaction, accepting new transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
break;
}
m_logger->debug(
"[{} - full: {}] Found transaction with the same key {}, checking mutability window and timestamps",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
conflicting_tx_res = DeSerializeTransaction( maybe_existing_value.value() );
if ( conflicting_tx_res.has_error() )
{
m_logger->warn( "[{} - full: {}] Failed to deserialize existing transaction {}, accepting new one",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
break;
}
}
conflicting_tx = std::move( conflicting_tx_res.value() );
m_logger->debug( "[{} - full: {}] Checking if new tx {} is the correct one",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
new_tx->GetHash() );
should_delete = !ShouldReplaceTransaction( *conflicting_tx, *new_tx );
} while ( 0 );
if ( should_delete )
{
std::vector<crdt::pb::Element> additional_elements_to_delete;
auto maybe_proof_key = GetExpectedProofKey( element.key(), new_tx );
if ( maybe_proof_key.has_value() )
{
crdt::pb::Element proof_element;
proof_element.set_key( maybe_proof_key.value() );
additional_elements_to_delete.push_back( proof_element );
}
maybe_tombstones = additional_elements_to_delete;
}
return maybe_tombstones;
}
std::optional<std::vector<crdt::pb::Element>> TransactionManager::FilterProof( const crdt::pb::Element &element )
{
std::optional<std::vector<crdt::pb::Element>> maybe_tombstones;
bool valid_proof = false;
do
{
//TODO - This verification is only needed because CRDT resyncs every boot up
// Remove once we remove the in memory processed_cids on crdt_datastore and use dagsyncer again
auto maybe_has_value = globaldb_m->Get( element.key() );
if ( maybe_has_value.has_value() )
{
m_logger->debug( "[{} - full: {}] Already have the proof {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
valid_proof = true;
break;
}
valid_proof = true;
break;
std::vector<uint8_t> proof_data_vector( element.value().begin(), element.value().end() );
auto maybe_valid_proof = IBasicProof::VerifyFullProof( proof_data_vector );
if ( maybe_valid_proof.has_error() || ( !maybe_valid_proof.value() ) )
{
// TODO: kill reputation point of the node.
m_logger->error( "[{} - full: {}] Could not verify proof {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
break;
}
m_logger->trace( "[{} - full: {}] Valid proof of {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
element.key() );
valid_proof = true;
} while ( 0 );
if ( !valid_proof )
{
std::vector<crdt::pb::Element> tombstones;
tombstones.push_back( element );
auto maybe_tx_key = GetExpectedTxKey( element.key() );
if ( maybe_tx_key.has_value() )
{
crdt::pb::Element tx_tombstone;
tx_tombstone.set_key( maybe_tx_key.value() );
tombstones.push_back( tx_tombstone );
}
maybe_tombstones = tombstones;
}
return maybe_tombstones;
}
bool TransactionManager::ShouldReplaceTransaction( const IGeniusTransactions &existing_tx,
const IGeniusTransactions &new_tx ) const
{
// First check if the existing transaction is immutable
if ( existing_tx.GetHash() == new_tx.GetHash() )
{
m_logger->info( "[{} - full: {}] Already have the same transaction, rejecting replacement attempt",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return false;
}
if ( IsTransactionImmutable( existing_tx ) )
{
m_logger->info( "[{} - full: {}] Existing transaction is immutable, rejecting replacement attempt",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
return false;
}
// Get timestamps and elapsed times
auto existing_timestamp = existing_tx.GetTimestamp();
auto new_timestamp = new_tx.GetTimestamp();
auto time_diff = GetElapsedTime( new_timestamp, existing_timestamp ); // preserve original semantics
// If new tx is earlier than existing (time_diff > 0) allow replacement.
// If timestamp_tolerance_m > 0 enforce the tolerance window; otherwise only the sign of time_diff is considered.
if ( time_diff > 0 )
{
if ( timestamp_tolerance_m.count() == 0 )
{
m_logger->debug(
"[{} - full: {}] Timestamp tolerance disabled — new tx earlier (diff {} ms): allowing replacement",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
time_diff );
return true;
}
if ( time_diff < timestamp_tolerance_m.count() )
{
m_logger->debug(
"[{} - full: {}] Timestamps within tolerance ({} ms). Existing: {} , New: {} , Diff: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
timestamp_tolerance_m.count(),
existing_timestamp,
new_timestamp,
time_diff );
m_logger->info( "[{} - full: {}] New transaction is earlier (ts: {} vs {}), will replace existing",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
new_timestamp,
existing_timestamp );
return true;
}
}
m_logger->warn(
"[{} - full: {}] New transaction not eligible for replacement. Existing: {} , New: {} , Diff: {} ms, Tolerance: {} ms",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
existing_timestamp,
new_timestamp,
time_diff,
timestamp_tolerance_m.count() );
return false;
}
uint64_t TransactionManager::GetCurrentTimestamp()
{
// Get current time in milliseconds since epoch
auto now = std::chrono::system_clock::now();
auto duration = now.time_since_epoch();
return std::chrono::duration_cast<std::chrono::milliseconds>( duration ).count();
}
int64_t TransactionManager::GetElapsedTime( uint64_t timestamp, uint64_t current_timestamp ) const
{
// Calculate elapsed time (can be negative if timestamp is in the future)
int64_t elapsed = static_cast<int64_t>( current_timestamp ) - static_cast<int64_t>( timestamp );
if ( elapsed < 0 )
{
m_logger->debug( "[{} - full: {}] Transaction timestamp {} is in the future (current: {}), elapsed: {} ms",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
timestamp,
current_timestamp,
elapsed );
}
else
{
m_logger->trace( "[{} - full: {}] Transaction timestamp {} elapsed: {} ms",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
timestamp,
elapsed );
}
return elapsed;
}
int64_t TransactionManager::GetElapsedTime( uint64_t timestamp ) const
{
return GetElapsedTime( timestamp, GetCurrentTimestamp() );
}
bool TransactionManager::IsTransactionImmutable( const IGeniusTransactions &tx ) const
{
// mutability window of zero => always mutable
if ( mutability_window_m.count() == 0 )
{
return false;
}
auto tx_timestamp = tx.GetTimestamp();
auto elapsed = GetElapsedTime( tx_timestamp );
// If elapsed is negative, the transaction is from the future - not immutable
if ( elapsed < 0 )
{
m_logger->debug( "[{} - full: {}] Transaction from future is not immutable (elapsed: {} ms)",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
elapsed );
return false;
}
bool is_immutable = elapsed > mutability_window_m.count();
if ( is_immutable )
{
m_logger->debug( "[{} - full: {}] Transaction is immutable (elapsed: {} ms, window: {} ms)",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
elapsed,
mutability_window_m.count() );
}
else
{
m_logger->trace( "[{} - full: {}] Transaction is still mutable (elapsed: {} ms, window: {} ms)",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
elapsed,
mutability_window_m.count() );
}
return is_immutable;
}
void TransactionManager::SetTimeFrameToleranceMs( uint64_t timeframe_tolerance )
{
timestamp_tolerance_m = std::chrono::milliseconds( timeframe_tolerance );
m_logger->info( "[{} - full: {}] Updated timeframe tolerance to {} ms",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
timeframe_tolerance );
}
void TransactionManager::SetMutabilityWindowMs( uint64_t mutability_window )
{
mutability_window_m = std::chrono::milliseconds( mutability_window );
m_logger->info( "[{} - full: {}] Updated mutability window to {} ms",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
mutability_window );
}
outcome::result<void> TransactionManager::RemoveTransactionFromProcessedMaps( const std::string &transaction_key,
bool delete_from_crdt )
{
m_logger->debug( "[{} - full: {}] Removing transaction from processed maps: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
transaction_key );
bool found = false;
{
std::unique_lock tx_lock( tx_mutex_m );
auto it = tx_processed_m.find( transaction_key );
if ( it != tx_processed_m.end() )
{
m_logger->debug( "[{} - full: {}] Removing from processed: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
transaction_key );
if ( it->second.tx )
{
OUTCOME_TRY( RevertTransaction( it->second.tx ) );
if ( delete_from_crdt )
{
auto topics = it->second.tx->GetTopics();
OUTCOME_TRY( DeleteTransaction( transaction_key, topics ) );
}
account_m->RollBackPeerConfirmedNonce( it->second.tx->dag_st.nonce(),
it->second.tx->dag_st.source_addr() );
if ( it->second.status == TransactionStatus::VERIFYING )
{
verifying_count_.fetch_sub( 1, std::memory_order_relaxed );
}
}
tx_processed_m.erase( it );
found = true;
}
}
if ( !found )
{
m_logger->debug( "[{} - full: {}] Transaction not found in processed maps: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
transaction_key );
}
return outcome::success();
}
outcome::result<void> TransactionManager::AddTransactionToProcessedMaps(
crdt::CRDTCallbackManager::NewDataPair new_data )
{
auto [key, value] = new_data;
m_logger->debug( "[{} - full: {}] Trying to deserialize {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key );
OUTCOME_TRY( auto &&new_tx, DeSerializeTransaction( value ) );
m_logger->debug( "[{} - full: {}] Deserialized transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key );
bool should_add_transaction = false;
auto tx_hash = new_tx->GetHash();
if ( tx_hash.empty() )
{
m_logger->error( "[{} - full: {}] Empty hash on {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key );
return outcome::failure( boost::system::error_code{} );
}
m_logger->debug( "[{} - full: {}] Checking if we already have this transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key );
std::unique_lock tx_lock( tx_mutex_m );
auto it = tx_processed_m.find( key );
if ( it != tx_processed_m.end() )
{
m_logger->debug( "[{} - full: {}] Already have the transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key );
return outcome::success();
}
m_logger->debug( "[{} - full: {}] Verifying if we have a conflicting transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key );
tx_lock.unlock();
auto conflicting_tx = GetConflictingTransaction( *new_tx );
tx_lock.lock();
if ( conflicting_tx.has_value() )
{
m_logger->debug( "[{} - full: {}] Found conflicting transaction with hash: {}, removing it",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
conflicting_tx.value()->GetHash() );
const auto conflict_hash = conflicting_tx.value()->GetHash();
tx_lock.unlock();
OUTCOME_TRY( RemoveTransactionFromProcessedMaps( GetTransactionPath( conflict_hash ), true ) );
tx_lock.lock();
}
m_logger->debug( "[{} - full: {}] Parsing new transaction {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key );
OUTCOME_TRY( ParseTransaction( new_tx ) );
const auto nonce = new_tx->dag_st.nonce();
account_m->SetPeerConfirmedNonce( nonce, new_tx->dag_st.source_addr() );
tx_processed_m[key] = TrackedTx{ new_tx, TransactionStatus::CONFIRMED, nonce };
return outcome::success();
}
void TransactionManager::ProcessDeletion( std::string key )
{
m_logger->debug( "[{} - full: {}] Processing deletion of {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key );
auto remove_res = RemoveTransactionFromProcessedMaps( key );
if ( remove_res.has_error() )
{
m_logger->error( "[{} - full: {}] Error removing transaction {}: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
key,
remove_res.error().message() );
}
}
void TransactionManager::ProcessNewData( crdt::CRDTCallbackManager::NewDataPair new_data )
{
m_logger->debug( "[{} - full: {}] Processing new data with key {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
new_data.first );
auto add_res = AddTransactionToProcessedMaps( new_data );
if ( add_res.has_error() )
{
m_logger->error( "[{} - full: {}] Error adding transaction {}: {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
new_data.first,
add_res.error().message() );
}
else
{
// Successfully received and processed new transaction data
// Mark that we've received data (for periodic sync interval adjustment)
if ( !received_first_periodic_sync_response_.load() )
{
received_first_periodic_sync_response_.store( true );
m_logger->info(
"[{} - full: {}] First transaction data received from network, switching to 10-minute periodic sync interval",
account_m->GetAddress().substr( 0, 8 ),
full_node_m );
}
}
}
void TransactionManager::NewElementCallback( crdt::CRDTCallbackManager::NewDataPair new_data )
{
{
std::lock_guard queue_lock( new_data_queue_mutex_ );
new_data_queue_.push( new_data );
}
m_logger->debug( "[{} - full: {}] CRDT new data queued, {} - (queue size: {})",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
new_data.first,
new_data_queue_.size() );
// Notify the condition variable to wake up the main loop
cv_.notify_one();
}
void TransactionManager::DeleteElementCallback( std::string deleted_key )
{
std::shared_ptr<IGeniusTransactions> new_tx;
{
std::lock_guard queue_lock( deleted_data_queue_mutex_ );
deleted_data_queue_.push( deleted_key );
}
m_logger->debug( "[{} - full: {}] CRDT deleted key queued, {} - (queue size: {})",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
deleted_key,
deleted_data_queue_.size() );
// Notify the condition variable to wake up the main loop
cv_.notify_one();
}
void TransactionManager::RegisterStateChangeCallback( StateChangeCallback callback )
{
std::lock_guard lock( state_change_callback_mutex_ );
state_change_callback_ = std::move( callback );
}
void TransactionManager::UnregisterStateChangeCallback()
{
std::lock_guard lock( state_change_callback_mutex_ );
state_change_callback_ = nullptr;
}
void TransactionManager::ChangeState( State new_state )
{
{
std::lock_guard lock( state_change_callback_mutex_ );
if ( state_m != new_state )
{
m_logger->info( "[{} - full: {}] State changed from {} to {}",
account_m->GetAddress().substr( 0, 8 ),
full_node_m,
state_m,
new_state );
auto old_state = state_m;
state_m = new_state;
if ( state_change_callback_ )
{
state_change_callback_( old_state, new_state );
}
}
}
}
outcome::result<std::shared_ptr<IGeniusTransactions>> TransactionManager::GetConflictingTransaction(
const IGeniusTransactions &element ) const
{
auto tx = GetTransactionByNonceAndAddress( element.dag_st.nonce(), element.GetSrcAddress() );
if ( tx )
{
return tx;
}
return outcome::failure( std::errc::no_such_file_or_directory );
}
}
fmt::format_context::iterator fmt::formatter<sgns::TransactionManager::State>::format(
sgns::TransactionManager::State s,
format_context &ctx ) const
{
using State = sgns::TransactionManager::State;
string_view name = "UNKNOWN";
switch ( s )
{
case State::CREATING:
name = "CREATING";
break;
case State::INITIALIZING:
name = "INITIALIZING";
break;
case State::SYNCING:
name = "SYNCING";
break;
case State::READY:
name = "READY";
break;
}
return formatter<string_view>::format( name, ctx );
}
Updated on 2026-03-04 at 13:10:44 -0800