Skip to content

src/subscription/subscriber.hpp

Namespaces

Name
sgns
sgns::subscription

Classes

Name
class sgns::subscription::Subscriber

Source code

#ifndef SUPERGENIUS_SUBSCRIPTION_SUBSCRIBER_HPP
#define SUPERGENIUS_SUBSCRIPTION_SUBSCRIBER_HPP

#include <functional>
#include <memory>
#include <mutex>

#include "subscription_engine.hpp"

namespace sgns::subscription {

  template <typename Key, typename Type, typename... Arguments>
  class Subscriber final : public std::enable_shared_from_this<
                               Subscriber<Key, Type, Arguments...>> {
   public:
    using KeyType = Key;
    using ValueType = Type;
    using HashType = size_t;
    using SubscriptionSetId = uint64_t;

    using SubscriptionEngineType =
        SubscriptionEngine<KeyType, ValueType, Arguments...>;
    using SubscriptionEnginePtr = std::shared_ptr<SubscriptionEngineType>;

    using CallbackFnType = std::function<void(ValueType&, const KeyType &, const Arguments &...)>;

   private:
    using SubscriptionsContainer =
        std::unordered_map<KeyType,
                           typename SubscriptionEngineType::IteratorType>;
    using SubscriptionsSets =
      std::unordered_map<SubscriptionSetId, SubscriptionsContainer>;


    SubscriptionSetId next_id_;
    SubscriptionEnginePtr engine_;
    ValueType object_;

    std::mutex subscriptions_cs_;
    SubscriptionsSets subscriptions_sets_;

    CallbackFnType on_notify_callback_;

   public:
    template <typename... Args>
    explicit Subscriber(SubscriptionEnginePtr &ptr, Args &&... args)
        : next_id_(0ull), engine_(ptr), object_(std::forward<Args>(args)...) {}

    ~Subscriber() {
      for (auto &[_, subscriptions] : subscriptions_sets_)
        for (auto &[key, it] : subscriptions)
          engine_->unsubscribe(key, it);
    }

    Subscriber(const Subscriber &) = delete;
    Subscriber &operator=(const Subscriber &) = delete;

    Subscriber(Subscriber &&) = default; // NOLINT
    Subscriber &operator=(Subscriber &&) = default; // NOLINT

    void setCallback(CallbackFnType &&f) {
      on_notify_callback_ = std::move(f);
    }

    SubscriptionSetId generateSubscriptionSetId() {
        return ++next_id_;
    }

    void subscribe(SubscriptionSetId id, const KeyType &key) {
      std::lock_guard lock(subscriptions_cs_);
      auto &&[it, inserted] = subscriptions_sets_[id].emplace(key, typename SubscriptionEngineType::IteratorType{});

      if (inserted)
        it->second = engine_->subscribe(key, this->weak_from_this());
    }

    void unsubscribe(SubscriptionSetId id, const KeyType &key) {
      std::lock_guard<std::mutex> lock(subscriptions_cs_);
      if (auto set_it = subscriptions_sets_.find(id); set_it != subscriptions_sets_.end()) {
        auto &subscriptions = set_it->second;
        auto it = subscriptions.find(key);
        if (subscriptions.end() != it) {
          engine_->unsubscribe(key, it->second);
          subscriptions.erase(it);
        }
      }
    }

    void unsubscribe(SubscriptionSetId id) {
      std::lock_guard<std::mutex> lock(subscriptions_cs_);
      if (auto set_it = subscriptions_sets_.find(id); set_it != subscriptions_sets_.end()) {
        auto &subscriptions = set_it->second;
        for (auto &[key, it] : subscriptions)
          engine_->unsubscribe(key, it);

        subscriptions_sets_.erase(set_it);
      }
    }

    void unsubscribe() {
      std::lock_guard<std::mutex> lock(subscriptions_cs_);
      for (auto &[_, subscriptions] : subscriptions_sets_)
        for (auto &[key, it] : subscriptions)
          engine_->unsubscribe(key, it);

      subscriptions_sets_.clear();
    }

    void on_notify(const KeyType &key, const Arguments &... args) {
      if (nullptr != on_notify_callback_) on_notify_callback_(object_, key, args...);
    }
  };

}  // namespace sgns::subscription

#endif  // SUPERGENIUS_SUBSCRIPTION_SUBSCRIBER_HPP

Updated on 2026-03-04 at 13:10:45 -0800