#pragma once
#include <cstdint>
#include <windows.h>
#include <optional>
#include "CAddressTranslator.h"
#define QUEUE
#include "CTlsObjectPool.h"
template <typename T>
class CLockFreeQueue
{
private:
// tail->next를 갱신할때 지역 Tail이 다른 큐에 들어가잇는걸 확인하기 위해 DOUBLE CAS를 해야하고 이를 위해 기존의 metaNext에 식별자를 추가한 구조체를 만듬
struct QNodePtr
{
uint64_t identifier_;
uintptr_t metaAddr_;
QNodePtr() {}
QNodePtr(uint64_t identifier, uintptr_t metaAddr)
:identifier_{ identifier }, metaAddr_{ metaAddr }{}
};
struct Node
{
T data_;
alignas(16) QNodePtr next_;
#pragma warning(disable : 26495)
Node(uint64_t identifier, uint64_t meta)
:next_{ identifier, CAddressTranslator::GetMetaAddr(meta,(uintptr_t)nullptr) }
{}
#pragma warning(default: 26495)
Node(T&& data, uint64_t identifier, uint64_t meta)
:data_{ std::forward<T>(data) },
next_{ identifier,CAddressTranslator::GetMetaAddr(meta,(uintptr_t)nullptr) }
{}
};
static inline CTlsObjectPool<Node, true> nodePool_;
static inline uint64_t identifier = 0;
const uint64_t identifier_;
alignas(64) uintptr_t metaTail_;
alignas(64) uintptr_t metaHead_;
alignas(64) uint64_t metaCnt_;
public:
alignas(64) long num_;
CLockFreeQueue()
:metaCnt_{ 0 }, identifier_{ InterlockedIncrement(&identifier) - 1 }
{
InterlockedExchange(&num_, 0);
uint64_t meta = CAddressTranslator::GetCnt(&metaCnt_);
Node* pDummy = nodePool_.Alloc(identifier_, meta);
metaTail_ = metaHead_ = CAddressTranslator::GetMetaAddr(meta, (uintptr_t)pDummy);
}
// meta가 붙으면 상위 17비트가 조작된 값
// real이 붙으면 상위 17비트를 제거한 값
void Enqueue(T data)
{
using namespace CAddressTranslator;
// 이번에 사용할 상위 17비트값 만들기
uint64_t meta = GetCnt(&metaCnt_);
// 새로운 노드 : next의 상위 17비트값의 초기화와 node의 T타입 생성자도 호출함
Node* pNewNode = nodePool_.Alloc(std::move(data), identifier_, meta);
QNodePtr newTail{ identifier_,GetMetaAddr(meta,(uintptr_t)pNewNode)};
while (true)
{
uintptr_t metaTail = metaTail_;
Node* pRealTail = (Node*)GetRealAddr(metaTail);
QNodePtr nextOfTail = pRealTail->next_;
if (GetRealAddr(nextOfTail.metaAddr_) != (uintptr_t)nullptr)
{
InterlockedCompareExchange(&metaTail_, nextOfTail.metaAddr_, metaTail);
continue;
}
if (ExtractMetaCnt(metaTail) != ExtractMetaCnt(nextOfTail.metaAddr_))
continue;
// 1번 CAS
// 바로 위의 if문으로 인해서 pRealTail이 재활용 되었다면 metaNext는 이미 재활용 되기 이전값임을 보장한다 따라서 재활용된경우 무조건 실패한다.
if (InterlockedCompareExchange128((LONG64*)&pRealTail->next_, newTail.metaAddr_, newTail.identifier_, (LONG64*)&nextOfTail) == FALSE)
continue;
// 2번 CAS 77번째 라인의 if문안의 CAS 떄문에 실패할수 잇으며 이를통해 스핀락이 아니게됨
InterlockedCompareExchange(&metaTail_, newTail.metaAddr_, metaTail);
InterlockedIncrement(&num_);
return;
}
}
std::optional<T> Dequeue()
{
using namespace CAddressTranslator;
if (InterlockedDecrement(&num_) < 0)
{
InterlockedIncrement(&num_);
return std::nullopt;
}
while (true)
{
uintptr_t metaTail = metaTail_;
uintptr_t metaHead = metaHead_;
Node* pRealHead_ = (Node*)GetRealAddr(metaHead);
QNodePtr nextOfHead = pRealHead_->next_;
if (GetRealAddr(nextOfHead.metaAddr_) == (uintptr_t)nullptr)
continue;
if (metaTail== metaHead)
{
InterlockedCompareExchange(&metaTail_, nextOfHead.metaAddr_, metaTail);
continue;
}
T retData = ((Node*)GetRealAddr(nextOfHead.metaAddr_))->data_;
if (InterlockedCompareExchange(&metaHead_, nextOfHead.metaAddr_, metaHead) == metaHead)
{
nodePool_.Free(pRealHead_);
return retData;
}
}
}
__forceinline const int long GetSize() const
{
return num_;
}
};

TlsObjectPool을 적용하면서 tail->next를 갱신해야할때 다른큐에서 재활용되엇는지 아닌지도 판단해야해서 큐 식별자를 포함하는 구조체로 현재 Tail이나 Head의 next를 래핑함.
Enqueue에서 다른 큐에서 재활용된 노드를 tail의 next에 꽃는것을 제대로 막는다면 Dequeue 에서는 할게 없다.
TlsObjectPool을 적용함으로서 기존에 발생하지 않는 문제점이 발생할 수 있는 상황은 두가지이다.
void Enqueue(T data)
{
using namespace CAddressTranslator;
// 이번에 사용할 상위 17비트값 만들기
uint64_t meta = GetCnt(&metaCnt_);
// 새로운 노드 : next의 상위 17비트값의 초기화와 node의 T타입 생성자도 호출함
Node* pNewNode = nodePool_.Alloc(std::move(data), identifier_, meta);
QNodePtr newTail{ identifier_,GetMetaAddr(meta,(uintptr_t)pNewNode)};
while (true)
{
uintptr_t metaTail = metaTail_;
Node* pRealTail = (Node*)GetRealAddr(metaTail); ---(1)
QNodePtr nextOfTail = pRealTail->next_; ---(2)
노드가 다른큐에 재활용 된 이후 (2)가 수행되는 경우와 그게 아닌 경우로 나누면 된다.
nextOfTail을 읽어온 이후 tail의 next가 재활용 된 경우에는 아래와 같은 1번 CAS에서 실패한다.
if (GetRealAddr(nextOfTail.metaAddr_) != (uintptr_t)nullptr) ---(3)
{
InterlockedCompareExchange(&metaTail_, nextOfTail.metaAddr_, metaTail);
continue;
}
if (ExtractMetaCnt(metaTail) != ExtractMetaCnt(nextOfTail.metaAddr_)) ---(4)
continue;
// 1번 CAS
// 바로 위의 if문으로 인해서 pRealTail이 재활용 되었다면 metaNext는 이미 재활용 되기 이전값임을 보장한다 따라서 재활용된경우 무조건 실패한다.
if (InterlockedCompareExchange128((LONG64*)&pRealTail->next_, newTail.metaAddr_, newTail.identifier_, (LONG64*)&nextOfTail) == FALSE)
continue;
(3)과 (4)의 기존 코드베이스에서의 역할은 다음과 같다.
(3) - tail의 next가 null이 아닐때 2번 CAS를 진행함으로서 스핀락을 막는다
(4) - tail의 next가 재활용되고나서 nextOfTail을 읽엇을때 1번 CAS가 성공하는 것을 막는다(같은큐일때는 완벽함)
문제는 지금은 상황이 바뀌어 pRealTail이 가리키는 노드가 다른 큐에서도 재활용 가능해졋다는 것이다.
물론 대부분의 경우 meta값(상위 17비트)가 다를것이기에 대부분 (3)과 (4)에서 걸리지만 만약
재활용되엇는데 상위 17비트 값까지 같고 큐 식별자가 다르며 이때 (1) -> 다른 큐에서 재활용 -> (2)의 순서로 진행되엇을때는 (3)과 (4)에서 통과되어서 1번 DOUBLE CAS가 성공해버린다는 문제가 발생하게된다.
이를 막으려면 (3)과 (4)를 통과한 시점에서 nexOfTail.identifier_를 추가적으로 검사해서 현재 큐와 다른 nextOfTail이 현재 큐의 식별자와 다른 경우라면 continue를 시켜버려야한다.
우선 부적절하게 DOUBLE CAS가 성공해서 다른큐에서 붕 뜨는 상황을 보도록 하자.


테스트 코드는 위와 같다.
전역에 두개의 락프리큐가 선언되어있고 4개의 스레드중 2개는 0, 2개는 1번째 락프리큐에 인큐와 디큐를 반복한다.
Q식별자가 0인 7516번 스레드는 pRealTail == 0x000002183ab7e600인 상황이며 이때 중지시킨다.

큐 식별자 0인 5740번 스레드는 pRealHead_ == 0x000002183ab7e600이며 이때 중지시킨다

큐 식별자가 1인 10260번 스레드는 pNewNode == 0x000002183ab7e600를 할당받고 이때의 metaAddress는 0x0003800000000000로 초기화 한다.
이를 통해 7516번 스레드는 pRealTail만 읽고 nextOfTail을 읽지 않은 상태에서 10260 스레드가 7516스레드의 pRealTail이 가리키는 노드를 재활용한 상황까지 재현되엇다.
하지만 상기 위에서 우려한 상황이 재현되려면 7516번 스레드의 지역변수 metaTail과 10260스레드의 pNewNode.next_.metaAddr_의 상위 17비트가 같아야한다.
이상황은 거의 재현되지 않기때문에 부득이하게 조사식으로 수정하였다.

7516번 스레드의 metaTail == 2303087601152 == 0x000002183ab7e600인데 상위 17비트만 보면 0이다.
15160 스레드의 pNewNode.next_.metaAddr_ 상위 17비트도 같이 0으로 바꾸엇다.




'포폴' 카테고리의 다른 글
| 락프리큐 초기 로깅 - LogAnalyzer 및 디버깅로그 설명 (1) | 2024.11.09 |
|---|---|
| 락프리큐 초기 로깅 - 코드 및 클래스 설명 (0) | 2024.11.09 |
| 락프리큐에 TlsPool 붙이기 - 문제 해결 (0) | 2024.10.18 |