src/subscription/subscriber.hpp
Namespaces
Classes
Source code
#ifndef SUPERGENIUS_SUBSCRIPTION_SUBSCRIBER_HPP
#define SUPERGENIUS_SUBSCRIPTION_SUBSCRIBER_HPP
#include <functional>
#include <memory>
#include <mutex>
#include "subscription_engine.hpp"
namespace sgns::subscription {
template <typename Key, typename Type, typename... Arguments>
class Subscriber final : public std::enable_shared_from_this<
Subscriber<Key, Type, Arguments...>> {
public:
using KeyType = Key;
using ValueType = Type;
using HashType = size_t;
using SubscriptionSetId = uint64_t;
using SubscriptionEngineType =
SubscriptionEngine<KeyType, ValueType, Arguments...>;
using SubscriptionEnginePtr = std::shared_ptr<SubscriptionEngineType>;
using CallbackFnType = std::function<void(ValueType&, const KeyType &, const Arguments &...)>;
private:
using SubscriptionsContainer =
std::unordered_map<KeyType,
typename SubscriptionEngineType::IteratorType>;
using SubscriptionsSets =
std::unordered_map<SubscriptionSetId, SubscriptionsContainer>;
SubscriptionSetId next_id_;
SubscriptionEnginePtr engine_;
ValueType object_;
std::mutex subscriptions_cs_;
SubscriptionsSets subscriptions_sets_;
CallbackFnType on_notify_callback_;
public:
template <typename... Args>
explicit Subscriber(SubscriptionEnginePtr &ptr, Args &&... args)
: next_id_(0ull), engine_(ptr), object_(std::forward<Args>(args)...) {}
~Subscriber() {
for (auto &[_, subscriptions] : subscriptions_sets_)
for (auto &[key, it] : subscriptions)
engine_->unsubscribe(key, it);
}
Subscriber(const Subscriber &) = delete;
Subscriber &operator=(const Subscriber &) = delete;
Subscriber(Subscriber &&) = default; // NOLINT
Subscriber &operator=(Subscriber &&) = default; // NOLINT
void setCallback(CallbackFnType &&f) {
on_notify_callback_ = std::move(f);
}
SubscriptionSetId generateSubscriptionSetId() {
return ++next_id_;
}
void subscribe(SubscriptionSetId id, const KeyType &key) {
std::lock_guard lock(subscriptions_cs_);
auto &&[it, inserted] = subscriptions_sets_[id].emplace(key, typename SubscriptionEngineType::IteratorType{});
if (inserted)
it->second = engine_->subscribe(key, this->weak_from_this());
}
void unsubscribe(SubscriptionSetId id, const KeyType &key) {
std::lock_guard<std::mutex> lock(subscriptions_cs_);
if (auto set_it = subscriptions_sets_.find(id); set_it != subscriptions_sets_.end()) {
auto &subscriptions = set_it->second;
auto it = subscriptions.find(key);
if (subscriptions.end() != it) {
engine_->unsubscribe(key, it->second);
subscriptions.erase(it);
}
}
}
void unsubscribe(SubscriptionSetId id) {
std::lock_guard<std::mutex> lock(subscriptions_cs_);
if (auto set_it = subscriptions_sets_.find(id); set_it != subscriptions_sets_.end()) {
auto &subscriptions = set_it->second;
for (auto &[key, it] : subscriptions)
engine_->unsubscribe(key, it);
subscriptions_sets_.erase(set_it);
}
}
void unsubscribe() {
std::lock_guard<std::mutex> lock(subscriptions_cs_);
for (auto &[_, subscriptions] : subscriptions_sets_)
for (auto &[key, it] : subscriptions)
engine_->unsubscribe(key, it);
subscriptions_sets_.clear();
}
void on_notify(const KeyType &key, const Arguments &... args) {
if (nullptr != on_notify_callback_) on_notify_callback_(object_, key, args...);
}
};
} // namespace sgns::subscription
#endif // SUPERGENIUS_SUBSCRIPTION_SUBSCRIBER_HPP
Updated on 2026-03-04 at 13:10:45 -0800