Skip to content

src/crdt/globaldb/pubsub_broadcaster.cpp

Namespaces

Name
sgns
sgns::crdt

Source code

#include "pubsub_broadcaster.hpp"

#include <utility>

namespace sgns::crdt
{
    PubSubBroadcaster::PubSubBroadcaster( std::shared_ptr<GossipPubSubTopic> pubSubTopic ) :
        gossipPubSubTopic_( std::move( pubSubTopic ) )
    {
        if ( gossipPubSubTopic_ != nullptr )
        {
            gossipPubSubTopic_->Subscribe(
                [this]( const boost::optional<const GossipPubSub::Message &> &message )
                {
                    if ( message )
                    {
                        std::string cid( reinterpret_cast<const char *>( message->data.data() ), message->data.size() );
                        auto        peerId = libp2p::peer::PeerId::fromBytes( message->from );
                        if ( peerId.has_value() )
                        {
                            std::scoped_lock lock( mutex_ );
                            listOfMessages_.emplace( std::move( peerId.value() ), std::move( cid ) );
                        }
                    }
                } );
        }
    }

outcome::result<void> PubSubBroadcaster::Broadcast(const base::Buffer &buff, std::string topic, boost::optional<libp2p::peer::PeerInfo> peerInfo)
{
    if (this->gossipPubSubTopic_ == nullptr)
    {
        return outcome::failure(boost::system::error_code{});
    }
    gossipPubSubTopic_->Publish(buff.toVector());
    return outcome::success();
}

outcome::result<base::Buffer> PubSubBroadcaster::Next()
{
    std::scoped_lock lock(mutex_);
    if (listOfMessages_.empty())
    {
        //Broadcaster::ErrorCode::ErrNoMoreBroadcast
        return outcome::failure(boost::system::error_code{});
    }

    std::string strBuffer = std::get<1>(listOfMessages_.front());
    listOfMessages_.pop();

    base::Buffer buffer;
    buffer.put(strBuffer);
    return buffer;
}

}

Updated on 2026-03-04 at 13:10:44 -0800