src/crdt/globaldb/pubsub_broadcaster_ext.hpp¶
Namespaces¶
| Name |
|---|
| sgns |
| sgns::crdt |
Classes¶
| Name | |
|---|---|
| class | sgns::crdt::PubSubBroadcasterExt Extended PubSub broadcaster that integrates with a CRDT datastore and Graphsync DAG syncer. |
Source code¶
#ifndef SUPERGENIUS_CRDT_PUBSUB_BROADCASTER_EXT_HPP
#define SUPERGENIUS_CRDT_PUBSUB_BROADCASTER_EXT_HPP
#include "crdt/broadcaster.hpp"
#include "crdt/graphsync_dagsyncer.hpp"
#include "crdt/crdt_datastore.hpp"
#include "base/logger.hpp"
#include <ipfs_pubsub/gossip_pubsub_topic.hpp>
#include <queue>
#include <tuple>
#include <vector>
#include <future>
#include <unordered_map>
#include <string>
#include <optional>
#include <mutex>
namespace sgns::crdt
{
class PubSubBroadcasterExt : public Broadcaster, public std::enable_shared_from_this<PubSubBroadcasterExt>
{
public:
using GossipPubSub = sgns::ipfs_pubsub::GossipPubSub;
~PubSubBroadcasterExt();
static std::shared_ptr<PubSubBroadcasterExt> New( std::shared_ptr<sgns::crdt::GraphsyncDAGSyncer> dagSyncer,
std::shared_ptr<GossipPubSub> pubSub );
outcome::result<void> Broadcast( const base::Buffer &buff, std::string topic, boost::optional<libp2p::peer::PeerInfo> peerInfo = boost::none ) override;
outcome::result<base::Buffer> Next() override;
void Start();
outcome::result<void> AddBroadcastTopic( const std::string &topicName );
void AddListenTopic( const std::string &topic );
bool HasTopic( const std::string &topic ) override;
std::shared_ptr<void> GetDagSyncer() const override { return dagSyncer_; }
void Stop();
bool AddSingleCIDInfo( const std::string &cid, const std::string peer_id, const std::string address );
private:
PubSubBroadcasterExt( std::shared_ptr<sgns::crdt::GraphsyncDAGSyncer> dagSyncer,
std::shared_ptr<GossipPubSub> pubSub );
void OnMessage( boost::optional<const GossipPubSub::Message &> message, const std::string &incomingTopic );
std::set<std::string> topicsToListen_;
std::set<std::string> topicsToBroadcast_;
std::shared_ptr<sgns::crdt::GraphsyncDAGSyncer> dagSyncer_;
std::queue<std::tuple<libp2p::peer::PeerId, std::string>> messageQueue_;
std::shared_ptr<GossipPubSub> pubSub_;
std::mutex queueMutex_;
std::mutex listenTopicsMutex_;
std::mutex broadcastTopicsMutex_;
std::mutex subscriptionMutex_;
std::atomic_bool started_;
sgns::base::Logger m_logger = sgns::base::createLogger( "PubSubBroadcasterExt" );
std::vector<std::shared_future<std::shared_ptr<libp2p::protocol::Subscription>>> subscriptionFutures_;
bool AddMultiCIDInfo( const std::vector<CID> &cids,
const libp2p::peer::PeerId &peer_id,
const std::vector<libp2p::multi::Multiaddress> &addr_vector );
};
}
#endif // SUPERGENIUS_CRDT_PUBSUB_BROADCASTER_EXT_HPP
Updated on 2026-03-04 at 13:10:44 -0800