Skip to content

src/account/AccountMessenger.hpp

Header file of the account messenger class. More...

Namespaces

Name
sgns

Classes

Name
class sgns::AccountMessenger
struct sgns::AccountMessenger::InterfaceMethods
Interface methods the user needs to define.

Functions

Name
OUTCOME_HPP_DECLARE_ERROR_2(sgns , AccountMessenger::Error )
Macro for declaring error handling in the AccountMessenger class.

Detailed Description

Header file of the account messenger class.

Date: 2025-07-21 Henrique A. Klein ([email protected])

Functions Documentation

function OUTCOME_HPP_DECLARE_ERROR_2

OUTCOME_HPP_DECLARE_ERROR_2(
    sgns ,
    AccountMessenger::Error 
)

Macro for declaring error handling in the AccountMessenger class.

Source code

#pragma once

#include <string>
#include <memory>
#include <functional>
#include <vector>
#include <future>
#include <condition_variable>
#include <queue>
#include <atomic>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <chrono>

#include <boost/optional.hpp>

#include "base/logger.hpp"
#include "ipfs_pubsub/gossip_pubsub.hpp"
#include "outcome/outcome.hpp"
#include "account/proto/SGAccountComm.pb.h"

namespace sgns
{
    class AccountMessenger : public std::enable_shared_from_this<AccountMessenger>
    {
    public:
        enum class Error
        {
            PROTO_DESERIALIZATION = 0, 
            PROTO_SERIALIZATION,       
            NONCE_REQUEST_IN_PROGRESS, 
            NONCE_GET_ERROR,           
            NO_RESPONSE_RECEIVED,      
            RESPONSE_WITHOUT_NONCE,    
            GENESIS_REQUEST_ERROR,     
        };

        struct InterfaceMethods
        {
            std::function<outcome::result<std::vector<uint8_t>>( std::vector<uint8_t> data )> sign_;

            std::function<outcome::result<bool>( std::string address, std::string sig, std::vector<uint8_t> data )>
                verify_signature_;

            std::function<outcome::result<uint64_t>( std::string address )> get_local_nonce_;

            std::function<outcome::result<std::string>( uint8_t block_index, const std::string &address )>
                get_block_cid_;

            std::function<outcome::result<bool>( const std::string &cid )> has_block_cid_;
        };

        // Global block response handler type
        using BlockResponseHandler =
            std::function<bool( const std::string &cid, const std::string &peer_id, const std::string &address )>;

        // Head request handler type: called when a head request is received for topics
        using HeadRequestHandler = std::function<void( const std::set<std::string> &topics )>;

        static std::shared_ptr<AccountMessenger> New( std::string                                address,
                                                      std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub,
                                                      InterfaceMethods                           methods );
        ~AccountMessenger();
        outcome::result<uint64_t> GetLatestNonce( uint64_t timeout_ms, uint64_t silent_time_ms = 150 );

        outcome::result<void> RequestGenesis(
            uint64_t timeout_ms,
            std::function<void( outcome::result<std::string> )> callback = nullptr );

        outcome::result<void> RequestAccountCreation(
            uint64_t timeout_ms,
            std::function<void( outcome::result<std::string> )> callback );

        outcome::result<void> RequestRegularBlock( uint64_t                                            timeout_ms,
                                                   std::string                                         cid,
                                                   std::function<void( outcome::result<std::string> )> callback = nullptr );

        void RegisterBlockResponseHandler( BlockResponseHandler handler );

        void ClearBlockResponseHandler();

        void RegisterHeadRequestHandler( HeadRequestHandler handler );

        void ClearHeadRequestHandler();

        outcome::result<void> RequestHeads( const std::unordered_set<std::string> &topics );

    private:
        static constexpr std::string_view ACCOUNT_COMM = ".comm";
        static constexpr std::string_view REQUESTS_COMM = "SGNUS.BC.Requests.comm";

        const std::string                          address_;            
        const std::string                          account_comm_topic_; 
        const std::string                          requests_topic_;     
        std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub_;             

        std::shared_future<std::shared_ptr<libp2p::protocol::Subscription>> subs_acc_future_;
        std::shared_future<std::shared_ptr<libp2p::protocol::Subscription>> subs_requests_future_;

        std::unordered_map<uint64_t, std::set<uint64_t>> nonce_responses_; 
        std::unordered_map<uint64_t, std::set<std::string>>
            no_nonce_responses_; 
        std::unordered_map<uint64_t, std::chrono::steady_clock::time_point>
                   first_response_time_;   
        std::mutex nonce_responses_mutex_; 

        // Block responses storage for account/genesis requests
        std::unordered_map<uint64_t, std::set<std::string>> block_responses_; 
        std::unordered_map<uint64_t, std::chrono::steady_clock::time_point>
                   block_first_response_time_; 
        std::mutex block_responses_mutex_;     

        InterfaceMethods methods_; 

        std::random_device rd_; 

        BlockResponseHandler global_block_handler_;
        std::mutex           global_handler_mutex_;

        HeadRequestHandler head_request_handler_;
        std::mutex         head_handler_mutex_;

        // Worker thread state
        enum class RequestType: std::uint8_t
        {
            Nonce,
            Genesis,
            AccountCreation,
            BlockByCid
        };

        struct RequestTask
        {
            RequestType                                         type;
            uint64_t                                            timeout_ms;
            uint64_t                                            silent_time_ms{ 150 };
            uint8_t                                             block_index{ 0 };
            std::string                                         cid;
            std::function<void( outcome::result<std::string> )> callback;
            std::shared_ptr<std::promise<outcome::result<uint64_t>>> nonce_promise;
        };

        std::thread                     worker_thread_;
        std::mutex                      queue_mutex_;
        std::condition_variable         queue_cv_;
        std::queue<RequestTask>         request_queue_;
        std::atomic<bool>               stop_worker_{ false };

        void WorkerLoop();
        void EnqueueTask( RequestTask task );
        outcome::result<uint64_t> PerformNonceRequest( uint64_t timeout_ms, uint64_t silent_time_ms );
        outcome::result<std::set<std::string>> PerformBlockRequest( uint64_t timeout_ms, uint8_t block_index );
        outcome::result<std::set<std::string>> PerformBlockCidRequest( uint64_t timeout_ms, const std::string &cid );

        AccountMessenger( std::string                                address,
                          std::shared_ptr<ipfs_pubsub::GossipPubSub> pubsub,
                          InterfaceMethods                           methods );
        outcome::result<void> RequestNonce( uint64_t req_id );

        outcome::result<void> RequestBlock( uint64_t req_id, uint8_t block_index );

        outcome::result<void> RequestBlockByCid( uint64_t req_id, const std::string &cid );

        void OnResponse( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message );
        void OnRequest( boost::optional<const ipfs_pubsub::GossipPubSub::Message &> message );
        outcome::result<void> SendAccountMessage( const accountComm::AccountMessage &msg,
                                                  const std::set<std::string>       &topics );

        void HandleNonceRequest( const accountComm::SignedNonceRequest &req );
        void HandleNonceResponse( const accountComm::SignedNonceResponse &resp );

        void HandleBlockRequest( const accountComm::SignedBlockRequest &req );
        void HandleBlockCidRequest( const accountComm::SignedBlockCidRequest &req );
        void HandleBlockResponse( const accountComm::SignedBlockResponse &resp );

        void HandleHeadRequest( const accountComm::SignedHeadRequest &req );

        base::Logger logger_ = sgns::base::createLogger( "AccountMessenger" );
    };
}

OUTCOME_HPP_DECLARE_ERROR_2( sgns, AccountMessenger::Error );

Updated on 2026-03-04 at 13:10:44 -0800