src/processing/processing_subtask_queue.cpp¶
Namespaces¶
| Name |
|---|
| sgns |
| sgns::processing |
Source code¶
#include "processing_subtask_queue.hpp"
#include <sstream>
#include <utility>
namespace sgns::processing
{
ProcessingSubTaskQueue::ProcessingSubTaskQueue( std::string localNodeId, TimestampProvider timestampProvider ) :
m_localNodeId( std::move( localNodeId ) ),
m_queue( nullptr ),
m_timestampProvider(timestampProvider)
{
}
void ProcessingSubTaskQueue::CreateQueue(
SGProcessing::ProcessingQueue* queue, const std::vector<int>& enabledItemIndices)
{
m_queue = queue;
m_enabledItemIndices = enabledItemIndices;
ChangeOwnershipTo(m_localNodeId);
}
bool ProcessingSubTaskQueue::UpdateQueue(
SGProcessing::ProcessingQueue* queue, const std::vector<int>& enabledItemIndices)
{
if ( ( m_queue == nullptr ) || ( m_queue->last_update_timestamp() <= queue->last_update_timestamp() ) )
{
m_queue = queue;
m_enabledItemIndices = enabledItemIndices;
LogQueue();
return true;
}
return false;
}
bool ProcessingSubTaskQueue::LockItem(size_t& lockedItemIndex, uint64_t now)
{
// The method has to be called in scoped lock of queue mutex
m_queue->set_last_update_timestamp(now);
for (auto itemIdx : m_enabledItemIndices)
{
if (m_queue->items(itemIdx).lock_node_id().empty())
{
auto mItem = m_queue->mutable_items(itemIdx);
mItem->set_lock_node_id(m_localNodeId);
mItem->set_lock_timestamp( now );
mItem->set_lock_expiration_timestamp( now + m_queue->processing_timeout_length());
LogQueue();
lockedItemIndex = itemIdx;
return true;
}
}
return false;
}
bool ProcessingSubTaskQueue::GrabItem(size_t& grabbedItemIndex, uint64_t now)
{
if (HasOwnership())
{
return LockItem(grabbedItemIndex, now);
}
return false;
}
bool ProcessingSubTaskQueue::MoveOwnershipTo(const std::string& nodeId)
{
if (HasOwnership())
{
ChangeOwnershipTo(nodeId);
return true;
}
return false;
}
void ProcessingSubTaskQueue::ChangeOwnershipTo(const std::string& nodeId)
{
m_queue->set_owner_node_id(nodeId);
LogQueue();
}
bool ProcessingSubTaskQueue::RollbackOwnership()
{
if(m_queue == nullptr)
{
//TODO: Why can this be nullptr?
return false;
}
if (HasOwnership())
{
// Do nothing. The node is already the queue owner
return true;
}
// Find the current queue owner
auto ownerNodeId = m_queue->owner_node_id();
int ownerNodeIdx = -1;
for (int itemIdx = 0; itemIdx < (int)m_queue->items_size(); ++itemIdx)
{
if (m_queue->items(itemIdx).lock_node_id() == ownerNodeId)
{
ownerNodeIdx = itemIdx;
break;
}
}
if (ownerNodeIdx >= 0)
{
// Check if the local node is the previous queue owner
for (int idx = 1; idx < m_queue->items_size(); ++idx)
{
// Loop cyclically over queue items in backward direction starting from item[ownerNodeIdx - 1]
// and excluding the item[ownerNodeIdx]
int itemIdx = (m_queue->items_size() + ownerNodeIdx - idx) % m_queue->items_size();
if (!m_queue->items(itemIdx).lock_node_id().empty())
{
if (m_queue->items(itemIdx).lock_node_id() == m_localNodeId)
{
// The local node is the previous queue owner
ChangeOwnershipTo(m_localNodeId);
return true;
}
// Another node should take the ownership
return false;
}
}
}
else
{
// The queue owner didn't lock any subtask
// Find the last locked item
for (int idx = 1; idx <= m_queue->items_size(); ++idx)
{
int itemIdx = (m_queue->items_size() - idx);
if (!m_queue->items(itemIdx).lock_node_id().empty())
{
if (m_queue->items(itemIdx).lock_node_id() == m_localNodeId)
{
// The local node is the last locked item
ChangeOwnershipTo(m_localNodeId);
return true;
}
// Another node should take the ownership
return false;
}
}
}
// No locked items found
// Allow the local node to take the ownership
ChangeOwnershipTo(m_localNodeId);
return true;
}
bool ProcessingSubTaskQueue::HasOwnership() const
{
return ( m_queue != nullptr ) && m_queue->owner_node_id() == m_localNodeId;
}
bool ProcessingSubTaskQueue::UnlockExpiredItems(uint64_t now)
{
bool unlocked = false;
m_queue->set_last_update_timestamp(now);
if (HasOwnership())
{
for (auto itemIdx : m_enabledItemIndices)
{
auto item = m_queue->items(itemIdx);
// Check if a subtask is locked, expired and no result was obtained for it
if (!item.lock_node_id().empty())
{
if (now > item.lock_expiration_timestamp())
{
auto mItem = m_queue->mutable_items(itemIdx);
m_logger->debug("EXPIRED_SUBTASK_UNLOCKED: index {} for {} expired at {}ms", itemIdx, item.lock_node_id(), item.lock_expiration_timestamp());
// Unlock the item
mItem->set_lock_node_id("");
mItem->set_lock_timestamp(0);
mItem->set_lock_expiration_timestamp(0);
unlocked = true;
}
}
}
}
return unlocked;
}
uint64_t ProcessingSubTaskQueue::GetLastLockTimestamp() const
{
uint64_t lastLockTimestamp = 0;
for (auto itemIdx : m_enabledItemIndices) {
if (!m_queue->items(itemIdx).lock_node_id().empty()) {
lastLockTimestamp = std::max(lastLockTimestamp,
static_cast<uint64_t>(m_queue->items(itemIdx).lock_timestamp()));
}
}
return lastLockTimestamp;
}
void ProcessingSubTaskQueue::LogQueue() const
{
if (m_logger->level() <= spdlog::level::trace)
{
std::stringstream ss;
ss << "{";
ss << "\"owner_node_id\":\"" << m_queue->owner_node_id() << "\"";
ss << "," << "\"last_update_timestamp\":" << m_queue->last_update_timestamp();
ss << "," << "\"items\":[";
for (int itemIdx = 0; itemIdx < m_queue->items_size(); ++itemIdx)
{
ss << "{\"lock_node_id\":\"" << m_queue->items(itemIdx).lock_node_id() << "\"";
ss << ",\"lock_timestamp\":" << m_queue->items(itemIdx).lock_timestamp();
ss << ",\"lock_expiration_timestamp\":" << m_queue->items(itemIdx).lock_expiration_timestamp();
ss << "},";
}
ss << "]}";
m_logger->trace(ss.str());
}
}
bool ProcessingSubTaskQueue::AddOwnershipRequest(const std::string& nodeId, uint64_t timestamp)
{
bool requestAdded = false;
if (m_queue != nullptr)
{
auto request = m_queue->add_ownership_requests();
request->set_node_id(nodeId);
request->set_request_timestamp(timestamp);
// Update the queue timestamp
m_queue->set_last_update_timestamp(timestamp);
LogQueue();
requestAdded = true;
}
return requestAdded;
}
bool ProcessingSubTaskQueue::ProcessNextOwnershipRequest()
{
bool proccessedNextRequest = false;
if (HasOwnership() && (m_queue != nullptr) && (m_queue->ownership_requests_size() > 0))
{
// Get the first request
auto request = m_queue->ownership_requests(0);
// Remove it from the queue (by moving all others up)
for (int i = 0; i < m_queue->ownership_requests_size() - 1; i++)
{
m_queue->mutable_ownership_requests(i)->CopyFrom(m_queue->ownership_requests(i + 1));
}
m_queue->mutable_ownership_requests()->RemoveLast();
// Transfer ownership
ChangeOwnershipTo(request.node_id());
LogQueue();
proccessedNextRequest = true;
}
return proccessedNextRequest;
}
}
Updated on 2026-03-04 at 13:10:44 -0800