개요
게임 서버를 구축할 때 멀티스레딩은 필수다. 따라서 자연스럽게 락을 사용하게 되는데, 락을 사용하다보면 교착 상태(Deadlock)를 피할 수 없다. 모든 교착 상태를 미연에 방지할 순 없지만 그래도 개발하면서 발생할 수 있는 교착 상태에 대해서는 탐지할 수 있는 모듈을 개발해보자.
교착 상태 탐지 원리
교착 상태는 자원의 잠금 순서를 지키지 않았을 때 발생한다. 아래의 코드를 보자.
#include <thread>
#include <mutex>
#include <iostream>
using namespace std;
mutex lock1, lock2;
int main()
{
jthread t1([&]
{
// while()을 하는 이유는 아래의 코드가 엄청 빨리 끝나기 때문에
// 단발성 실행으로는 교착 상태를 재현할 수 없기 때문이다.
while (true)
{
// lock1 -> lock2 순서로 잠금을 시도한다.
cout << "Task1 Run\n";
unique_lock<mutex> guard1(lock1);
// 아래의 코드가 들어간 이유도 마찬가지다.
// 궁금하면 주석처리를 하고 실행해보자.
this_thread::sleep_for(100ms);
unique_lock<mutex> guard2(lock2);
}
});
jthread t2([&]
{
while (true)
{
// lock2 -> lock1 순서로 잠금을 시도한다.
cout << "Task2 Run\n";
unique_lock<mutex> guard1(lock2);
this_thread::sleep_for(100ms);
unique_lock<mutex> guard2(lock1);
}
});
}
그럼 자원의 잠금 순서를 지키지 않았다는 것을 어떻게 판별할 수 있을까? 자원 할당 그래프(Resource-Allocation Graph)를 구축해 사이클이 있는지 판별하면 된다. 위의 코드를 자원 할당 그래프로 나타내면 아래와 같다.
자, 그럼 코드로 구현해보자.
CRASH 매크로
우선 매크로가 하나 필요하다. 이 매크로는 빠른 디버깅을 위해서 사용한다. 요점만 설명하기 위해 코드에 필요한 표준 라이브러리 헤더는 모두 생략한다.
#define CRASH(cause) \
{ \
uint32_t* crash = nullptr; \
*crash = 0xDEADBEEF; \
}
코드를 보면 0번 주소에 데이터를 기록함으로써 메모리 접근 위반(Memory Access Violation)을 일으키는 것을 알 수 있다. 사용법은 아래와 같다.
class MyStack
{
size_t _numOfElements;
public:
void Pop()
{
if (_numOfElements == 0)
{
// 디버깅을 위한 메시지를 코드에 남겨놓는다.
CRASH("NO_ELEMENTS");
}
}
}
위와 같이 코드를 작성하고 디버그 모드로 실행하면 CRASH()
에서 실행이 멈추기 때문에 편리하게 디버깅할 수 있다.
DeadlockProfiler 구현
자, 본격적으로 교착 상태를 탐지해 줄 DeadlockProfiler
의 클래스를 구현해보자. 우선 아래의 코드에서 시작하자.
// DeadlockProfiler.h
using lock_id = int32_t;
// DeadlockProfiler는 싱글톤으로 사용될 예정이다.
class DeadlockProfiler
{
// 여러 스레드에서 사용할 것이기 때문에 뮤텍스가 필요하다.
mutex _lock;
public:
// 잠글 때 그래프를 구축하고 교착 상태를 검사할 것이다.
void PushLock(mutex* const lock);
void PopLock(mutex* const lock);
private:
// 그래프의 정점을 식별하기 위해 아래와 같은 해시테이블이 필요하다.
unordered_map<mutex*, lock_id> _lockToId;
// 인접 리스트 방식으로 그래프를 구축한다.
unordered_map<lock_id, set<lock_id>> _neighbors;
};
구현하기 전에 이 클래스의 사용법부터 알고 가자. 아래와 같이 RAII 방식의 Wrapper 클래스를 만들어 사용한다.
class SafeLockGuard
{
// 상술했듯, 프로세스 내 하나의 인스턴스만 존재해야 한다.
static DeadlockProfiler s_deadlockProfiler;
public:
explicit SafeLockGuard(mutex& lock)
: _lock{ lock }
{
// lock의 잠금은 PushLock()에서 이뤄질 것이다.
s_deadlockProfiler.PushLock(&lock);
}
~SafeLockGuard()
{
// 위와 마찬가지로 lock의 해제도 PopLock()에서 이뤄질 것이다.
s_deadlockProfiler.PopLock(&_lock);
}
private:
mutex& _lock;
};
// 사용법은 아래와 같다.
mutex sharedResource;
void Foo()
{
SafeLockGuard g(sharedResource);
}
PushLock()
부터 구현해보자. PushLock()
의 로직은 아래와 같다.
lock
의 ID를 가져오거나 발급한다.- 새로운
lock
이라면 교착 상태를 확인한다. - 교착 상태가 없다면
lock
을 잠근다.
PushLock()
의 파라미터는 mutex* const
타입이다. 다시 말해 공유 자원의 주소를 받는다. 공유 자원의 주소를 정점으로 활용하려면 복잡하기 때문에 그래프에서 활용할 식별자가 필요하다. 이를 위한 getOrCreateId()
함수를 작성하자. 클래스에 선언될 내용은 생략한다.
// DeadlockProfiler.cpp
lock_id DeadlockProfiler::getOrCreateId(mutex* const lock)
{
// 새로운 ID를 발급한다.
if (false == _lockToId.contains(lock))
{
// 잠금의 순서대로 ID를 발급한다.
lock_id newId = static_cast<lock_id>(_lockToId.size());
_lockToId[lock] = newId;
}
return _lockToId[lock];
}
스레드마다 각자의 잠금 순서열을 기록해두고 있어야 새로운 lock
인지 판별할 수 있다. 따라서 TLS(Thread Local Storage)에 아래와 같은 stack
을 만든다.
// DeadlockProfiler.cpp
// 현재 스레드의 잠금의 순서를 기록할 것이다.
// 이 정보를 활용해 새로운 lock인지 판별하고,
// 새로운 lock이라면 교착 상태를 탐지할 것이다.
thread_local stack<lock_id> t_lockSequence;
새로운 lock
인지 판별하기 위해 isNewLock()
함수도 구현하자.
bool DeadlockProfiler::isNewLock(lock_id id)
{
// 이전 lock을 가져온다.
lock_id prevId = t_lockSequence.top();
// NOTE : 구현하다보면 같은 뮤텍스에 대한 잠금이 여러 번 일어날 수 있다.
// 현재 코드는 mutex만을 사용할 것이기 때문에
// 만약 같은 뮤텍스에 대한 잠금이 여러 번 일어나는 순간 오류가 발생하겠지만,
// 잠금을 직접 구현할 수 있기 때문에 남겨둔다.
if (prevId == id)
{
return false;
}
// 그래프에 이미 추가된 정보인지 확인한다.
// prevId->id 로의 간선이 있는가?
if (_neighbors[prevId].contains(id))
{
return false;
}
return true;
}
이제 헬퍼 함수를 모두 구현했기 때문에 PushLock()
을 완성할 수 있다.
void DeadlockProfiler::PushLock(mutex* const lock)
{
lock_guard<mutex> guard(_lock);
lock_id id = getOrCreateId(lock);
if (t_lockSequence.size() > 0 && isNewLock(id))
{
// 그래프에 prevId->id 간선 정보를 추가한다.
lock_id prevId = t_lockSequence.top();
_neighbors[prevId].insert(id);
// 새로운 lock이기 때문에 교착 상태가 있는지(현재 그래프에 사이클이 있는지) 확인한다.
// _neighbors는 여러 스레드를 통해 수정되는 정보이므로 교착 상태를 탐지할 수 있다.
// 가령, 1번 스레드가 0->1 순서로 자원을 이용하고 2번 스레드가 1->0 순서로 자원을 이용한다면
// 이 모든 정보가 _neighbors에 기록되어 있기 때문에 사이클을 탐지할 수 있다.
// 이 함수는 밑에서 구현한다.
checkDeadlock();
}
// 문제가 없다면 잠금 순서열에 push한다.
t_lockSequence.push(id);
// 해당 lock을 잠근다.
lock->lock();
}
PopLock()
은 딱히 할 것이 없다. 잠금 순서열에서 마지막 잠금을 빼면 된다.
void DeadlockProfiler::PopLock(mutex* const lock)
{
lock_guard<mutex> guard(_lock);
/* NOTE : 혹시 모를 휴먼 에러를 방지하기 위해 아래와 같은 코드를 추가해줄 수 있다.
lock_id prev = t_lockSequence.top();
lock_id current = getOrCreateId(lock);
if (prev != current)
{
CRASH("INVALID");
}
*/
t_lockSequence.pop();
lock->unlock();
}
이제 교착 상태를 탐지해보자. 사이클을 판별하기 위해 아래와 같은 자료 구조가 필요하다.
// 각 정점의 방문 순서를 기록해둔다.
// _visitOrder[lockId] 형태로 활용한다.
vector<int32_t> _visitOrder;
int32_t _visitOrderNumber;
// 각 정점의 사이클 판별이 끝났는지 기록한다.
// _hasFinishedToCheckCycle[lockId] 형태로 활용한다.
vector<bool> _hasFinishedToCheckCycle;
checkDeadlock()
은 위 데이터를 초기화하고 사이클 판별 후 정리하는 로직이 들어 있다.
static constexpr int32_t NOT_VISIT = -1; // 0이 아닌 이유는 0이 유효한 식별자이기 때문이다.
void DeadlockProfiler::checkDeadlock()
{
int32_t lockCount = static_cast<int32_t>(_lockToId.size());
_visitOrder = vector(lockCount, NOT_VISIT);
_visitOrderNumber = 0;
_hasFinishedToCheckCycle = vector(lockCount, false);
// 모든 잠금에 대해 사이클이 있는지 검사한다.
for (lock_id id = 0; id < lockCount; ++id)
{
// 아래의 함수는 DFS다.
checkCycle(id);
}
_visitOrder.clear();
_hasFinishedToCheckCycle.clear();
}
checkCycle()
은 아래와 같다.
void DeadlockProfiler::checkCycle(lock_id id)
{
if (_visitOrder[id] != NOT_VISIT)
{
return;
}
// 방문 순서를 기록한다.
_visitOrder[id] = _visitOrderNumber++;
for (lock_id neighbor : _neighbors[id])
{
if (_visitOrder[neighbor] == NOT_VISIT)
{
checkCycle(neighbor);
continue;
}
// PushLock()에서 간선을 기록할 때 prevId -> id 형태로 기록하였다.
// 따라서 올바른 순서라면 현재 잠금의 방문 순서가 먼저일 수밖에 없다.
if (_visitOrder[id] < _visitOrder[neighbor])
{
continue;
}
// 코드의 흐름이 여기까지 왔다는 것은 사이클이 있는 것이다.
if (_hasFinishedToCheckCycle[id] == false)
{
CRASH("DEADLOCK_DETECTED");
}
}
_hasFinishedToCheckCycle[id] = true;
}
테스트를 해보자.
테스트를 위해 아래와 같이 2개의 클래스를 각각 다른 파일에 구현한다.
// AccountManager.h
#pragma once
#include <mutex>
class AccountManager
{
public:
void AccountThenPlayer();
void Lock();
private:
std::mutex _lock;
};
extern AccountManager g_accountManager;
// AccountManager.cpp
#include "DeadlockProfiler.h"
#include "AccountManager.h"
#include "PlayerManager.h"
AccountManager g_accountManager;
void AccountManager::AccountThenPlayer()
{
SafeLockGuard g(_lock);
g_playerManager.Lock();
}
void AccountManager::Lock()
{
SafeLockGuard g(_lock);
}
// PlayerManager.h
#pragma once
#include <mutex>
class PlayerManager
{
public:
void PlayerThenAccount();
void Lock();
private:
std::mutex _lock;
};
extern PlayerManager g_playerManager;
// PlayerManager.cpp
#include "DeadlockProfiler.h"
#include "PlayerManager.h"
#include "AccountManager.h"
PlayerManager g_playerManager;
void PlayerManager::PlayerThenAccount()
{
SafeLockGuard g(_lock);
g_accountManager.Lock();
}
void PlayerManager::Lock()
{
SafeLockGuard g(_lock);
}
그리고 한 스레드는 PlayerManager->AccountManager
순서로 잠금 획득을 시도하고, 다른 스레드는 AccountManager->PlayerManager
순서로 잠금 획득을 시도한다.
#include <thread>
#include <iostream>
#include "DeadlockProfiler.h"
#include "PlayerManager.h"
#include "AccountManager.h"
using namespace std;
int main()
{
jthread t1([&]
{
while (true)
{
cout << "Task1 Run\n";
g_playerManager.PlayerThenAccount();
this_thread::sleep_for(100ms);
}
});
jthread t2([&]
{
while (true)
{
cout << "Task2 Run\n";
g_accountManager.AccountThenPlayer();
this_thread::sleep_for(100ms);
}
});
}
그리고 디버그 모드로 실행하면?
바로 감지한다는 것을 알 수 있다. 전체 코드는 여기에서 확인할 수 있다.
생각해볼 거리
- 현재는 교착 상태를 단순하게 탐지만 하고 있다. 하지만, 코드의 규모가 늘어났을 때는 단순하게 탐지만으로는 디버깅하기가 쉽지 않다. 교착 상태가 발생했을 때, 잠금 순서까지 보여주려면 어떻게 코드를 바꿔야 할까?
참고자료
'Study > Game Server' 카테고리의 다른 글
이상한 나라의 소켓 옵션 (0) | 2024.10.25 |
---|