globaldb/crdt_work_journal.hpp¶
Persistent work-journal for CRDT key processing state. More...
Namespaces¶
| Name |
|---|
| sgns |
| sgns::storage |
| sgns::crdt |
Classes¶
| Name | |
|---|---|
| class | sgns::crdt::CRDTWorkJournal Tracks key processing lifecycle persisted in RocksDB. |
| struct | sgns::crdt::CRDTWorkJournal::Entry Serialized work-journal entry for a key. |
Detailed Description¶
Persistent work-journal for CRDT key processing state.
Date: 2026-04-21 Henrique A. Klein ([email protected])
Source code¶
#ifndef SUPERGENIUS_CRDT_WORK_JOURNAL_HPP
#define SUPERGENIUS_CRDT_WORK_JOURNAL_HPP
#include <chrono>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
namespace sgns::storage
{
class rocksdb;
}
namespace sgns::crdt
{
class CRDTWorkJournal
{
public:
enum class State : uint8_t
{
Seen = 0,
Processing = 1,
Stalled = 2,
};
struct Entry
{
std::string key;
State state = State::Seen;
uint64_t attempt_count = 0;
uint64_t updated_at_ms = 0;
uint64_t lease_until_ms = 0;
};
static std::shared_ptr<CRDTWorkJournal> New( std::shared_ptr<storage::rocksdb> datastore );
void MarkSeen( const std::string &key );
void MarkProcessing( const std::string &key, std::chrono::milliseconds lease = std::chrono::minutes( 5 ) );
void MarkStalled( const std::string &key, std::chrono::milliseconds lease = std::chrono::minutes( 5 ) );
bool MarkDone( const std::string &key );
std::optional<Entry> GetEntry( const std::string &key ) const;
std::vector<Entry> ListUnfinished( std::string_view key_pattern = {} ) const;
size_t RecoverStaleProcessing( std::string_view key_pattern,
std::chrono::milliseconds stale = std::chrono::milliseconds( 0 ) );
private:
static constexpr std::string_view NAMESPACE_PREFIX = "/crdt/work/";
explicit CRDTWorkJournal( std::shared_ptr<storage::rocksdb> datastore );
static uint64_t NowMs();
std::string BuildStorageKey( const std::string &key ) const;
static std::optional<Entry> DeserializeEntry( std::string_view storage_key, std::string_view value );
static std::string SerializeEntry( const Entry &entry );
static std::vector<std::string> Split( const std::string &value, char separator );
std::optional<Entry> GetEntryUnlocked( const std::string &key ) const;
bool PutEntryUnlocked( const Entry &entry ) const;
std::shared_ptr<storage::rocksdb> datastore_;
mutable std::mutex mutex_;
};
}
#endif // SUPERGENIUS_CRDT_WORK_JOURNAL_HPP
Updated on 2026-06-05 at 17:22:19 -0700