Study/Game Server

게임 서버 라이브러리 제작하기 - 교착 상태 탐지 모듈

해달 2024. 9. 23. 17:11

개요

게임 서버를 구축할 때 멀티스레딩은 필수다. 따라서 자연스럽게 락을 사용하게 되는데, 락을 사용하다보면 교착 상태(Deadlock)를 피할 수 없다. 모든 교착 상태를 미연에 방지할 순 없지만 그래도 개발하면서 발생할 수 있는 교착 상태에 대해서는 탐지할 수 있는 모듈을 개발해보자.

교착 상태 탐지 원리

교착 상태는 자원의 잠금 순서를 지키지 않았을 때 발생한다. 아래의 코드를 보자.

#include <thread>
#include <mutex>
#include <iostream>
using namespace std;

mutex lock1, lock2;

int main()
{
    jthread t1([&]
        {
            // while()을 하는 이유는 아래의 코드가 엄청 빨리 끝나기 때문에
            // 단발성 실행으로는 교착 상태를 재현할 수 없기 때문이다.
            while (true)
            {
                // lock1 -> lock2 순서로 잠금을 시도한다.
                cout << "Task1 Run\n";
                unique_lock<mutex> guard1(lock1);
                // 아래의 코드가 들어간 이유도 마찬가지다.
                // 궁금하면 주석처리를 하고 실행해보자.
                this_thread::sleep_for(100ms);
                unique_lock<mutex> guard2(lock2);
            }
        });

    jthread t2([&]
        {
            while (true)
            {
                // lock2 -> lock1 순서로 잠금을 시도한다.
                cout << "Task2 Run\n";
                unique_lock<mutex> guard1(lock2);
                this_thread::sleep_for(100ms);
                unique_lock<mutex> guard2(lock1);
            }
        });
}

그럼 자원의 잠금 순서를 지키지 않았다는 것을 어떻게 판별할 수 있을까? 자원 할당 그래프(Resource-Allocation Graph)를 구축해 사이클이 있는지 판별하면 된다. 위의 코드를 자원 할당 그래프로 나타내면 아래와 같다.

 

보다시피 사이클이 있는 것을 알 수 있다.

 

자, 그럼 코드로 구현해보자.

CRASH 매크로

우선 매크로가 하나 필요하다. 이 매크로는 빠른 디버깅을 위해서 사용한다. 요점만 설명하기 위해 코드에 필요한 표준 라이브러리 헤더는 모두 생략한다.

#define CRASH(cause)            \
{                                \
    uint32_t* crash = nullptr;    \
    *crash = 0xDEADBEEF;        \
}

코드를 보면 0번 주소에 데이터를 기록함으로써 메모리 접근 위반(Memory Access Violation)을 일으키는 것을 알 수 있다. 사용법은 아래와 같다.

class MyStack
{
    size_t _numOfElements;
public:
    void Pop()
    {
        if (_numOfElements == 0)
        {
            // 디버깅을 위한 메시지를 코드에 남겨놓는다.
            CRASH("NO_ELEMENTS");
        }
    }
}

위와 같이 코드를 작성하고 디버그 모드로 실행하면 CRASH() 에서 실행이 멈추기 때문에 편리하게 디버깅할 수 있다.

DeadlockProfiler 구현

자, 본격적으로 교착 상태를 탐지해 줄 DeadlockProfiler의 클래스를 구현해보자. 우선 아래의 코드에서 시작하자.

// DeadlockProfiler.h

using lock_id = int32_t;

// DeadlockProfiler는 싱글톤으로 사용될 예정이다.
class DeadlockProfiler
{
    // 여러 스레드에서 사용할 것이기 때문에 뮤텍스가 필요하다.
    mutex    _lock;

public:
    // 잠글 때 그래프를 구축하고 교착 상태를 검사할 것이다.
    void    PushLock(mutex* const lock);
    void    PopLock(mutex* const lock);
private:
    // 그래프의 정점을 식별하기 위해 아래와 같은 해시테이블이 필요하다.
    unordered_map<mutex*, lock_id>            _lockToId;
    // 인접 리스트 방식으로 그래프를 구축한다.
    unordered_map<lock_id, set<lock_id>>    _neighbors;
};

구현하기 전에 이 클래스의 사용법부터 알고 가자. 아래와 같이 RAII 방식의 Wrapper 클래스를 만들어 사용한다.

class SafeLockGuard
{
    // 상술했듯, 프로세스 내 하나의 인스턴스만 존재해야 한다.
    static DeadlockProfiler s_deadlockProfiler;
public:
    explicit SafeLockGuard(mutex& lock)
        : _lock{ lock }
    {
        // lock의 잠금은 PushLock()에서 이뤄질 것이다.
        s_deadlockProfiler.PushLock(&lock);
    }
    ~SafeLockGuard()
    {
        // 위와 마찬가지로 lock의 해제도 PopLock()에서 이뤄질 것이다.
        s_deadlockProfiler.PopLock(&_lock);
    }
private:
    mutex& _lock;
};

// 사용법은 아래와 같다.
mutex sharedResource;
void Foo()
{
    SafeLockGuard g(sharedResource);
}

PushLock()부터 구현해보자. PushLock()의 로직은 아래와 같다.

  1. lock의 ID를 가져오거나 발급한다.
  2. 새로운 lock이라면 교착 상태를 확인한다.
  3. 교착 상태가 없다면 lock을 잠근다.

PushLock()의 파라미터는 mutex* const 타입이다. 다시 말해 공유 자원의 주소를 받는다. 공유 자원의 주소를 정점으로 활용하려면 복잡하기 때문에 그래프에서 활용할 식별자가 필요하다. 이를 위한 getOrCreateId() 함수를 작성하자. 클래스에 선언될 내용은 생략한다.

// DeadlockProfiler.cpp
lock_id DeadlockProfiler::getOrCreateId(mutex* const lock)
{
    // 새로운 ID를 발급한다.
    if (false == _lockToId.contains(lock))
    {
        // 잠금의 순서대로 ID를 발급한다.
        lock_id newId = static_cast<lock_id>(_lockToId.size());
        _lockToId[lock] = newId;
    }

    return _lockToId[lock];
}

스레드마다 각자의 잠금 순서열을 기록해두고 있어야 새로운 lock인지 판별할 수 있다. 따라서 TLS(Thread Local Storage)에 아래와 같은 stack을 만든다.

// DeadlockProfiler.cpp

// 현재 스레드의 잠금의 순서를 기록할 것이다.
// 이 정보를 활용해 새로운 lock인지 판별하고,
// 새로운 lock이라면 교착 상태를 탐지할 것이다.
thread_local stack<lock_id> t_lockSequence;

새로운 lock인지 판별하기 위해 isNewLock() 함수도 구현하자.

bool DeadlockProfiler::isNewLock(lock_id id)
{
    // 이전 lock을 가져온다.
    lock_id prevId = t_lockSequence.top();

    // NOTE : 구현하다보면 같은 뮤텍스에 대한 잠금이 여러 번 일어날 수 있다.
    // 현재 코드는 mutex만을 사용할 것이기 때문에
    // 만약 같은 뮤텍스에 대한 잠금이 여러 번 일어나는 순간 오류가 발생하겠지만,
    // 잠금을 직접 구현할 수 있기 때문에 남겨둔다.
    if (prevId == id)
    {
        return false;
    }

    // 그래프에 이미 추가된 정보인지 확인한다.
    // prevId->id 로의 간선이 있는가?
    if (_neighbors[prevId].contains(id))
    {
        return false;
    }

    return true;
}

이제 헬퍼 함수를 모두 구현했기 때문에 PushLock()을 완성할 수 있다.

void DeadlockProfiler::PushLock(mutex* const lock)
{
    lock_guard<mutex> guard(_lock);

    lock_id id = getOrCreateId(lock);

    if (t_lockSequence.size() > 0 && isNewLock(id))
    {
        // 그래프에 prevId->id 간선 정보를 추가한다.
        lock_id prevId = t_lockSequence.top();
        _neighbors[prevId].insert(id);

        // 새로운 lock이기 때문에 교착 상태가 있는지(현재 그래프에 사이클이 있는지) 확인한다.
        // _neighbors는 여러 스레드를 통해 수정되는 정보이므로 교착 상태를 탐지할 수 있다.
        // 가령, 1번 스레드가 0->1 순서로 자원을 이용하고 2번 스레드가 1->0 순서로 자원을 이용한다면
        // 이 모든 정보가 _neighbors에 기록되어 있기 때문에 사이클을 탐지할 수 있다.
        // 이 함수는 밑에서 구현한다.
        checkDeadlock();
    }

    // 문제가 없다면 잠금 순서열에 push한다.
    t_lockSequence.push(id);

    // 해당 lock을 잠근다.
    lock->lock();
}

PopLock()은 딱히 할 것이 없다. 잠금 순서열에서 마지막 잠금을 빼면 된다.

void DeadlockProfiler::PopLock(mutex* const lock)
{
    lock_guard<mutex> guard(_lock);

    /* NOTE : 혹시 모를 휴먼 에러를 방지하기 위해 아래와 같은 코드를 추가해줄 수 있다.
    lock_id prev = t_lockSequence.top();
    lock_id current = getOrCreateId(lock);
    if (prev != current)
    {
        CRASH("INVALID");
    }
    */

    t_lockSequence.pop();
    lock->unlock();
}

이제 교착 상태를 탐지해보자. 사이클을 판별하기 위해 아래와 같은 자료 구조가 필요하다.

// 각 정점의 방문 순서를 기록해둔다.
// _visitOrder[lockId] 형태로 활용한다.
vector<int32_t>        _visitOrder;
int32_t                _visitOrderNumber;

// 각 정점의 사이클 판별이 끝났는지 기록한다.
// _hasFinishedToCheckCycle[lockId] 형태로 활용한다.
vector<bool>        _hasFinishedToCheckCycle;

checkDeadlock()은 위 데이터를 초기화하고 사이클 판별 후 정리하는 로직이 들어 있다.

static constexpr int32_t NOT_VISIT = -1; // 0이 아닌 이유는 0이 유효한 식별자이기 때문이다.
void DeadlockProfiler::checkDeadlock()
{
    int32_t lockCount = static_cast<int32_t>(_lockToId.size());
    _visitOrder = vector(lockCount, NOT_VISIT);
    _visitOrderNumber = 0;
    _hasFinishedToCheckCycle = vector(lockCount, false);

    // 모든 잠금에 대해 사이클이 있는지 검사한다.
    for (lock_id id = 0; id < lockCount; ++id)
    {
        // 아래의 함수는 DFS다.
        checkCycle(id);
    }

    _visitOrder.clear();
    _hasFinishedToCheckCycle.clear();
}

checkCycle()은 아래와 같다.

void DeadlockProfiler::checkCycle(lock_id id)
{
    if (_visitOrder[id] != NOT_VISIT)
    {
        return;
    }

    // 방문 순서를 기록한다.
    _visitOrder[id] = _visitOrderNumber++;

    for (lock_id neighbor : _neighbors[id])
    {
        if (_visitOrder[neighbor] == NOT_VISIT)
        {
            checkCycle(neighbor);
            continue;
        }

        // PushLock()에서 간선을 기록할 때 prevId -> id 형태로 기록하였다.
        // 따라서 올바른 순서라면 현재 잠금의 방문 순서가 먼저일 수밖에 없다.
        if (_visitOrder[id] < _visitOrder[neighbor])
        {
            continue;
        }

        // 코드의 흐름이 여기까지 왔다는 것은 사이클이 있는 것이다.
        if (_hasFinishedToCheckCycle[id] == false)
        {
            CRASH("DEADLOCK_DETECTED");
        }
    }

    _hasFinishedToCheckCycle[id] = true;
}

테스트를 해보자.

테스트를 위해 아래와 같이 2개의 클래스를 각각 다른 파일에 구현한다.

// AccountManager.h
#pragma once

#include <mutex>

class AccountManager
{
public:
    void AccountThenPlayer();
    void Lock();
private:
    std::mutex _lock;
};

extern AccountManager g_accountManager;

// AccountManager.cpp
#include "DeadlockProfiler.h"
#include "AccountManager.h"
#include "PlayerManager.h"

AccountManager g_accountManager;

void AccountManager::AccountThenPlayer()
{
    SafeLockGuard g(_lock);

    g_playerManager.Lock();
}

void AccountManager::Lock()
{
    SafeLockGuard g(_lock);
}

// PlayerManager.h
#pragma once

#include <mutex>

class PlayerManager
{
public:
    void PlayerThenAccount();
    void Lock();
private:
    std::mutex _lock;
};

extern PlayerManager g_playerManager;

// PlayerManager.cpp
#include "DeadlockProfiler.h"
#include "PlayerManager.h"
#include "AccountManager.h"

PlayerManager g_playerManager;

void PlayerManager::PlayerThenAccount()
{
    SafeLockGuard g(_lock);

    g_accountManager.Lock();
}

void PlayerManager::Lock()
{
    SafeLockGuard g(_lock);
}

그리고 한 스레드는 PlayerManager->AccountManager 순서로 잠금 획득을 시도하고, 다른 스레드는 AccountManager->PlayerManager 순서로 잠금 획득을 시도한다.

#include <thread>
#include <iostream>

#include "DeadlockProfiler.h"
#include "PlayerManager.h"
#include "AccountManager.h"

using namespace std;

int main()
{
    jthread t1([&]
        {
            while (true)
            {
                cout << "Task1 Run\n";
                g_playerManager.PlayerThenAccount();
                this_thread::sleep_for(100ms);
            }
        });

    jthread t2([&]
        {
            while (true)
            {
                cout << "Task2 Run\n";
                g_accountManager.AccountThenPlayer();
                this_thread::sleep_for(100ms);
            }
        });
}

그리고 디버그 모드로 실행하면?

이러면 코드를 잘못 작성함으로 발생하는 교착 상태는 미연에 방지할 수 있다.

 

바로 감지한다는 것을 알 수 있다. 전체 코드는 여기에서 확인할 수 있다.

생각해볼 거리

  1. 현재는 교착 상태를 단순하게 탐지만 하고 있다. 하지만, 코드의 규모가 늘어났을 때는 단순하게 탐지만으로는 디버깅하기가 쉽지 않다. 교착 상태가 발생했을 때, 잠금 순서까지 보여주려면 어떻게 코드를 바꿔야 할까?

참고자료