src/crdt/globaldb/pubsub_broadcaster_ext.cpp¶
Namespaces¶
| Name |
|---|
| sgns |
| sgns::crdt |
Source code¶
#include "pubsub_broadcaster_ext.hpp"
#include "base/sgns_version.hpp"
#include "crdt/globaldb/proto/broadcast.pb.h"
#include "crdt/crdt_datastore.hpp"
#include <ipfs_lite/ipld/ipld_node.hpp>
#include <regex>
#include <utility>
#include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp>
using namespace boost;
namespace sgns::crdt
{
namespace
{
boost::optional<libp2p::peer::PeerInfo> PeerInfoFromString(
const google::protobuf::RepeatedPtrField<std::string> &addresses )
{
std::vector<libp2p::multi::Multiaddress> valid_addresses;
boost::optional<libp2p::peer::PeerId> peer_id;
for ( const auto &address : addresses )
{
auto server_ma_res = libp2p::multi::Multiaddress::create( address );
if ( !server_ma_res )
{
continue; // Skip invalid addresses
}
auto server_ma = std::move( server_ma_res.value() );
auto server_peer_id_str = server_ma.getPeerId();
if ( !server_peer_id_str )
{
continue; // Skip addresses without a peer ID
}
auto server_peer_id_res = libp2p::peer::PeerId::fromBase58( *server_peer_id_str );
if ( !server_peer_id_res )
{
continue; // Skip invalid peer IDs
}
if ( !peer_id )
{
peer_id = server_peer_id_res.value(); // Set peer ID for the first valid address
}
else if ( peer_id.value() != server_peer_id_res.value() )
{
return boost::none; // Peer IDs must match
}
valid_addresses.push_back( std::move( server_ma ) );
}
if ( valid_addresses.empty() || !peer_id )
{
return boost::none; // No valid addresses or no peer ID
}
return libp2p::peer::PeerInfo{ peer_id.value(), std::move( valid_addresses ) };
}
}
std::shared_ptr<PubSubBroadcasterExt> PubSubBroadcasterExt::New(
std::shared_ptr<sgns::crdt::GraphsyncDAGSyncer> dagSyncer,
std::shared_ptr<GossipPubSub> pubSub )
{
if ( !dagSyncer )
{
return nullptr;
}
if ( !pubSub )
{
return nullptr;
}
auto instance = std::shared_ptr<PubSubBroadcasterExt>(
new PubSubBroadcasterExt( std::move( dagSyncer ), std::move( pubSub ) ) );
return instance;
}
PubSubBroadcasterExt::PubSubBroadcasterExt( std::shared_ptr<sgns::crdt::GraphsyncDAGSyncer> dagSyncer,
std::shared_ptr<GossipPubSub> pubSub ) :
dagSyncer_( std::move( dagSyncer ) ), pubSub_( std::move( pubSub ) ), started_{ false }
{
m_logger->trace( "Initializing PubSubBroadcasterExt" );
}
PubSubBroadcasterExt ::~PubSubBroadcasterExt()
{
m_logger->debug( "~PubSubBroadcasterExt CALLED" );
}
void PubSubBroadcasterExt::Start()
{
if ( !started_ )
{
started_ = true;
std::lock_guard<std::mutex> lock( listenTopicsMutex_ );
// Subscribe to each topic.
for ( auto &topicName : topicsToListen_ )
{
m_logger->debug( "Subscription request sent to topic: " + topicName );
// Subscribe and capture the topic name in the lambda.
std::shared_future<std::shared_ptr<libp2p::protocol::Subscription>> future = std::move(
pubSub_->Subscribe( topicName,
[weakptr = weak_from_this(),
topicName]( boost::optional<const GossipPubSub::Message &> message )
{
if ( auto self = weakptr.lock() )
{
self->m_logger->debug( "Message received on topic: {}", topicName );
self->OnMessage( message, topicName );
}
} ) );
{
std::lock_guard lk( subscriptionMutex_ );
subscriptionFutures_.push_back( std::move( future ) );
}
}
}
}
void PubSubBroadcasterExt::OnMessage( boost::optional<const GossipPubSub::Message &> message,
const std::string &incomingTopic )
{
// Log that a message has been received (the incoming parameter is not used for filtering).
m_logger->trace( "Received a message from topic {}", incomingTopic );
do
{
if ( !message )
{
m_logger->error( "No message to process" );
break;
}
sgns::crdt::broadcasting::BroadcastMessage bmsg;
if ( !bmsg.ParseFromArray( message->data.data(), message->data.size() ) )
{
m_logger->error( "Failed to parse BroadcastMessage" );
break;
}
auto peer_id_res = libp2p::peer::PeerId::fromBytes(
gsl::span<const uint8_t>( reinterpret_cast<const uint8_t *>( bmsg.peer().id().data() ),
bmsg.peer().id().size() ) );
if ( !peer_id_res )
{
m_logger->error( "Failed to construct PeerId from bytes" );
break;
}
auto peerId = peer_id_res.value();
m_logger->trace( "Message from peer {}", peerId.toBase58() );
base::Buffer buf;
buf.put( bmsg.data() );
auto cids = CrdtDatastore::DecodeBroadcast( buf );
if ( cids.has_failure() )
{
m_logger->error( "Failed to decode broadcast payload" );
break;
}
auto addresses = bmsg.peer().addrs();
std::vector<libp2p::multi::Multiaddress> addrvector;
for ( auto &addr : addresses )
{
auto addr_res = libp2p::multi::Multiaddress::create( addr );
if ( addr_res )
{
addrvector.push_back( addr_res.value() );
m_logger->trace( "Added Address: {}", addr_res.value().getStringAddress() );
}
}
if ( addrvector.empty() )
{
m_logger->trace( "No addresses available for CIDs broadcast" );
break;
}
if ( dagSyncer_->IsOnBlackList( peerId ) )
{
m_logger->debug( "The peer {} is blacklisted", peerId.toBase58() );
break;
}
bool new_content = AddMultiCIDInfo( cids.value(), peerId, addrvector );
if ( new_content )
{
std::lock_guard<std::mutex> lock( queueMutex_ );
messageQueue_.emplace( std::move( peerId ), bmsg.data() );
}
else
{
m_logger->debug( "No new content from message" );
}
} while ( 0 );
}
outcome::result<void> PubSubBroadcasterExt::Broadcast( const base::Buffer &buff, std::string topic, boost::optional<libp2p::peer::PeerInfo> peerInfo )
{
std::set<std::string> broadcastTopicsCopy;
{
std::lock_guard<std::mutex> lock( broadcastTopicsMutex_ );
broadcastTopicsCopy = topicsToBroadcast_;
}
if ( !topic.empty() )
{
auto full_topic = topic + version::GetNetAndVersionAppendix();
broadcastTopicsCopy.emplace( full_topic );
}
if ( broadcastTopicsCopy.empty() )
{
m_logger->error( "Broadcast: no topic to broadcast" );
return outcome::failure( boost::system::error_code{} );
}
sgns::crdt::broadcasting::BroadcastMessage bmsg;
auto bpi = new sgns::crdt::broadcasting::BroadcastMessage_PeerInfo;
// Get peer_id - determine which branch to use first, then initialize
boost::optional<libp2p::peer::PeerId> peer_id_opt;
if ( peerInfo )
{
peer_id_opt = peerInfo->id;
}
else
{
auto peer_id_res = dagSyncer_->GetId();
if ( !peer_id_res )
{
m_logger->error( "Dag syncer has no peer ID." );
delete bpi;
return outcome::failure( boost::system::error_code{} );
}
peer_id_opt = peer_id_res.value();
}
auto& peer_id = peer_id_opt.value();
bpi->set_id( std::string( peer_id.toVector().begin(), peer_id.toVector().end() ) );
// Add addresses from PeerInfo (which already includes observed, interface, and relay addresses)
if ( peerInfo )
{
for ( auto &address : peerInfo->addresses )
{
bpi->add_addrs( address.getStringAddress() );
}
}
if ( bpi->addrs_size() <= 0 )
{
m_logger->warn( "No addresses found for broadcasting." );
delete bpi;
return outcome::success();
}
bmsg.set_allocated_peer( bpi );
std::string data( buff.toString() );
bmsg.set_data( data );
size_t size = bmsg.ByteSizeLong();
std::vector<uint8_t> serialized_proto( size );
bmsg.SerializeToArray( serialized_proto.data(), serialized_proto.size() );
for ( auto &topic : broadcastTopicsCopy )
{
pubSub_->PublishBuffered( topic, serialized_proto );
if ( m_logger->level() <= spdlog::level::trace )
{
m_logger->trace( "CIDs broadcasted by {} to topic {}, at this {}",
peer_id.toBase58(),
topic,
reinterpret_cast<size_t>( this ) );
}
}
return outcome::success();
}
outcome::result<base::Buffer> PubSubBroadcasterExt::Next()
{
std::lock_guard<std::mutex> lock( queueMutex_ );
if ( messageQueue_.empty() )
{
// Broadcaster::ErrorCode::ErrNoMoreBroadcast
return outcome::failure( boost::system::error_code{} );
}
std::string strBuffer = std::get<1>( messageQueue_.front() );
messageQueue_.pop();
base::Buffer buffer;
buffer.put( strBuffer );
return buffer;
}
outcome::result<void> PubSubBroadcasterExt::AddBroadcastTopic( const std::string &topicName )
{
auto full_topic = topicName + version::GetNetAndVersionAppendix();
{
std::lock_guard<std::mutex> lock( broadcastTopicsMutex_ );
if ( topicsToBroadcast_.find( full_topic ) != topicsToBroadcast_.end() )
{
m_logger->trace( "Topic '{}' already exists. Skipping.", full_topic );
return outcome::success();
}
topicsToBroadcast_.insert( full_topic );
}
return outcome::success();
}
bool PubSubBroadcasterExt::HasTopic( const std::string &topic )
{
std::lock_guard<std::mutex> lock( broadcastTopicsMutex_ );
return topicsToBroadcast_.find( topic ) != topicsToBroadcast_.end();
}
void PubSubBroadcasterExt::AddListenTopic( const std::string &topic )
{
auto full_topic = topic + version::GetNetAndVersionAppendix();
std::lock_guard lock( listenTopicsMutex_ );
if ( topicsToListen_.find( full_topic ) != topicsToListen_.end() )
{
this->m_logger->debug( "Already listening to topic {}", full_topic );
return;
}
topicsToListen_.insert( full_topic );
m_logger->debug( "Listen request on topic: '{}'", full_topic );
if ( started_ )
{
std::shared_future<std::shared_ptr<libp2p::protocol::Subscription>> future = std::move( pubSub_->Subscribe(
full_topic,
[weakptr = weak_from_this(), full_topic]( boost::optional<const GossipPubSub::Message &> message )
{
if ( auto self = weakptr.lock() )
{
self->m_logger->debug( "Message received from topic: " + full_topic );
self->OnMessage( message, full_topic );
}
} ) );
{
std::lock_guard lock( subscriptionMutex_ );
subscriptionFutures_.push_back( std::move( future ) );
}
}
}
void PubSubBroadcasterExt::Stop()
{
std::lock_guard lock( subscriptionMutex_ );
// Wait for any pending futures to complete before clearing
for ( auto &future : subscriptionFutures_ )
{
if ( future.valid() )
{
try
{
// Check if the future is ready without blocking indefinitely
if ( future.wait_for( std::chrono::milliseconds( 0 ) ) == std::future_status::ready )
{
// Future is ready, safe to access
future.get();
}
// If not ready, just let it be destroyed naturally
}
catch ( ... )
{
// Ignore any exceptions during cleanup
}
}
}
subscriptionFutures_.clear(); // Clear all pending subscriptions
}
bool PubSubBroadcasterExt::AddSingleCIDInfo( const std::string &cid,
const std::string peer_id,
const std::string address )
{
bool ret = false;
do
{
auto cidResult = CID::fromString( cid );
if ( cidResult.has_error() )
{
m_logger->error( "{}: Failed to construct CID from string", __func__ );
break;
}
auto peer_id_res = libp2p::peer::PeerId::fromBytes(
gsl::span<const uint8_t>( reinterpret_cast<const uint8_t *>( peer_id.data() ), peer_id.size() ) );
if ( !peer_id_res )
{
m_logger->error( "{}: Failed to construct PeerId from string", __func__ );
break;
}
auto addr_res = libp2p::multi::Multiaddress::create( address );
if ( !addr_res )
{
m_logger->error( "{}: Failed to construct Address from string", __func__ );
break;
}
if ( AddMultiCIDInfo( { cidResult.value() }, peer_id_res.value(), { addr_res.value() } ) == false )
{
break;
}
auto cid_buffer = sgns::crdt::CrdtDatastore::EncodeBroadcastStatic( { cidResult.value() } );
if ( cid_buffer.has_error() )
{
break;
}
std::lock_guard<std::mutex> lock( queueMutex_ );
messageQueue_.emplace( peer_id_res.value(), std::string( cid_buffer.value().toString() ) );
ret = true;
} while ( 0 );
return ret;
}
bool PubSubBroadcasterExt::AddMultiCIDInfo( const std::vector<CID> &cids,
const libp2p::peer::PeerId &peer_id,
const std::vector<libp2p::multi::Multiaddress> &addr_vector )
{
bool new_content = false;
for ( const auto &cid : cids )
{
auto hb = dagSyncer_->HasBlock( cid );
if ( !hb.has_value() )
{
m_logger->debug( "{}: HasBlock query failed for CID {}", __func__, cid.toString().value() );
continue;
}
if ( hb.value() )
{
m_logger->trace( "{}: Not adding route node {} from {} (already have block)",
__func__,
cid.toString().value(),
addr_vector[0].getStringAddress() );
continue;
}
new_content = true;
if ( dagSyncer_->IsCIDInCache( cid ) )
{
m_logger->debug( "{}: CID {} already cached without data, refreshing route from {} {}",
__func__,
cid.toString().value(),
addr_vector[0].getStringAddress(),
peer_id.toBase58() );
}
else
{
m_logger->debug( "{}: Request node {} from {} {}",
__func__,
cid.toString().value(),
addr_vector[0].getStringAddress(),
peer_id.toBase58() );
}
dagSyncer_->AddRoute( cid, peer_id, addr_vector );
}
return new_content;
}
} // namespace sgns::crdt
Updated on 2026-03-04 at 13:10:44 -0800