C# 쓰레드의 지역 저장소 - ThreadLocal

http://msdn.microsoft.com/ko-kr/library/dd642243.aspx



 간혹 쓰레드마다 가지고 있어야 하는 별도의 공간이 필요할 때가 있을 것이다. 이때 사용할 수 있는 객체가 ThreadLocal 클래스다. .Net Framework 4에서 처음 등작 하였다. 이전에는 ThreadStaticAttribute이용해서 이와 같은 유사한 기능을 사용할 수 있었으나 이제는 ThreadLocal을 선언해 사용하기만 하면 된다. 아래 코드를 살펴 보자

#region ThreadLocal 테스트
[ThreadStatic]  // 쓰레드마다 다른 값을 가지도록 한다.
private static string ThreadValue = string.Empty;
 
/// <summary>
/// 데이터의 스레드 로컬 저장소를 제공하는 ThreadLocal를 테스트 한다.
/// </summary>
public void ThreadLocal_TestMethod()
{
    // 데이터의 스레드 로컬 저장소를 제공합니다.
    ThreadLocal<string> ThreadName = new ThreadLocal<string>(() =>
    {
        return "Thread" + Thread.CurrentThread.ManagedThreadId;
    });
 
    Action action = () =>
    {
        // 값이 true면 같은 쓰레드에서 이미 한번 실행되었다는 것이다.
        bool repeat = ThreadName.IsValueCreated;
 
        Console.WriteLine("ThreadName = {0} {1}"ThreadName.Valuerepeat ? "(repeat)" : "");
    };
 
    // Launch eight of them.  On 4 cores or less, you should see some repeat ThreadNames
    // 8개의 액션을 실행 하도록 한다.
    // 만약 동시 실행되는 Task가 4개라면 나머지 4개는 같은 쓰레드에서 ThreadLocal을 접근 했다는 것을 뜻한다.
    // 같은 쓰레드 안에서 사용할 값을 컨트롤 할 수 있다.
    Parallel.Invoke(actionactionactionactionactionactionactionaction);
 
    // 만약 쓰레드 마다 다른 값을 가져가기 위해서는 ThreadStaticAttribute로 선언해서 사용하면 된다.
    // [ThreadStatic] ThreadValue 변수 처럼 말이다.
 
    // Dispose when you are done
    ThreadName.Dispose();
}
#endregion

[코드1] ThreadLocal<T> 코드 테스트


 위 코드를 실행한 결과 화면인 "그림1"을 보면 어떻게 실행되는지 쉽게 알 수 있을 것이다.


[그림1] ThreadLocal 결과 화면


 

소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       


C# 낮은 수준의 동기화 SpinLock

http://msdn.microsoft.com/ko-kr/library/dd997366.aspx


 우리가 흔히 사용하는 Lock 객체와 같은 일을 하는 클래스가 .Net Framework 4에서 새로 나왔다. SpinLock 객체가 하는 역할은 Lock과 같은 역할을 하지만 새로 나온 이유가 있을 것이다. MSDN에서는 아래와 같이 설명하고 있다.


대기 시간이 짧을 것으로 예상되고 경합이 적으면 다른 종류의 잠금보다 SpinLock을 수행하는 것이 효과적입니다. 그러나 SpinLock은 System.Threading.Monitor 메서드 또는 Interlocked 메서드로 인해 프로그램 성능이 크게 떨어진다는 점이 프로파일링을 통해 확인될 때만 사용하는 것이 좋습니다. 
http://msdn.microsoft.com/ko-kr/library/dd997366.aspx

[표1] SpinLock MSDN 설명 발췌


 위 글에서 Monitor은 Lock이 컴파일이 되면 내부적으로 Monitor로 대체되기 때문에 Lock을 따로 언급하지 않은것으로 보인다. 그렇다면 위 이야기대로라면 SpinLock이 모든 lock 상황에서 성능 우위를 지원하는 것은 아님을 알수 있으니 개발하는 입장에서는 일일이 확인해 보고 적용을 하라는 뜻이다. ^^;;

그래도 SpinLock를 사용해야 하는 상황이 특출나게 성능을 요하는 작업에 대해서만 고려하면 되기 때문에 그나마 다행이라 할 수 있겠다.


 이제 본격적으로 검토를 해 보자. SpinLock와 Lock(Monitor)을 사용해서 성능 비교를 해 보도록 하자.

// 잠김 횟수
const int N = 100000;
static Queue<LockDataObject> _spinLockQueue = new Queue<LockDataObject>();
static Queue<LockDataObject> _lockQueue = new Queue<LockDataObject>();
 
// 잠김에 사용할 객체
static object _lock = new Object();
// 잠김에 사용할 SpinLock 객체
static SpinLock _spinlock = new SpinLock();
 
// 수행에 필요한 데이터 객체
class LockDataObject
{
    public string Name { getset; }
    public double Number { getset; }
}
 
/// <summary>
/// Lock 객체로 잠금 상태에서 Queue에 넣기
/// </summary>
/// <param name="d"></param>
/// <param name="i"></param>
private static void UpdateWithSpinLock(LockDataObject dint i)
{
    // false로 세팅하고 spinLock 객채에 Enter해야 한다.
    bool lockTaken = false;
    try
    {
        _spinlock.Enter(ref lockTaken);
 
        // working
 
        // CPU 사이클 동안 대기
        Thread.SpinWait(500);
 
        // Queue에 넣는건 제외
        // 메모리에 넣고 가비지 컬렉터의 영향을 덜 받기 위해 주석 처리 함.
        //_spinLockQueue.Enqueue(d);
    }
    catch (LockRecursionException ex)
    {
        Console.WriteLine("{0}, {1}"Thread.CurrentThread.ManagedThreadIdex.Message);
    }
    finally
    {
        // 잠김 풀기
        if (lockTaken)
        {
            _spinlock.Exit(false);
        }
    }
}
 
/// <summary>
/// SpinLock 객체로 잠금 병렬 수행
/// </summary>
private static void UseSpinLock()
{
    Stopwatch sw = Stopwatch.StartNew();
 
    // 병렬 실행
    Parallel.Invoke(
            () =>
            {
                for (int i = 0i < Ni++)
                {
                    UpdateWithSpinLock(new LockDataObject() { Name = i.ToString(), Number = i }, i);
                }
            },
            () =>
            {
                for (int i = 0i < Ni++)
                {
                    UpdateWithSpinLock(new LockDataObject() { Name = i.ToString(), Number = i }, i);
                }
            }
        );
 
    sw.Stop();
    Console.WriteLine("elapsed ms with spinlock: {0}"sw.ElapsedMilliseconds);
}
 
/// <summary>
/// Lock 객체로 잠금 상태에서 Queue에 넣기
/// </summary>
/// <param name="d"></param> 
/// <param name="i"></param>
static void UpdateWithLock(LockDataObject dint i)
{
    lock (_lock)
    {
        // working
 
        // CPU 사이클 동안 대기
        Thread.SpinWait(500);
        // Queue에 넣는건 제외
        // 메모리에 넣고 가비지 컬렉터의 영향을 덜 받기 위해 주석 처리 함.
        //_lockQueue.Enqueue(d);
    }
}
 
/// <summary>
/// Lock 객채로 잠금을 사용해서 병렬 수행
/// </summary>
private static void UseLock()
{
    Stopwatch sw = Stopwatch.StartNew();
 
    // 병렬 실행
    Parallel.Invoke(
            () =>
            {
                for (int i = 0i < Ni++)
                {
                    UpdateWithLock(new LockDataObject() { Name = i.ToString(), Number = i }, i);
                }
            },
            () =>
            {
                for (int i = 0i < Ni++)
                {
                    UpdateWithLock(new LockDataObject() { Name = i.ToString(), Number = i }, i);
                }
            }
        );
 
    sw.Stop();
    Console.WriteLine("elapsed ms with lock: {0}"sw.ElapsedMilliseconds);
}

[코드1] SpinLock와 Lock 성능 비교


 소스 코드상의 주석에서도 언급 했듯이 어떤 케이스를 먼저 실행 하느냐에 따라서 결과 값이 상이하게 나오고 있으며 SpinLock를 사용하여 테스트를 하면 프로그램 시작후 가장 처음에 실행되었을때 가장 안 좋은 결과치를 지속적으로 보여주고 있었다. 물론 지금의 성능 비교를 하는 케이스가 SpinLock를 만든 목적에 부합되지 않을 수도 있기 때문에 그런 현상이 발생할 수도 있을 것이다. 두 케이스의 비교를 통해 결론을 내리자면 위 상황에서는 대체적으로 Lock 객체를 사용해서 잠금 발생을 하는것이 성능 우위를 나타내고 있었다. 그렇지만 다른 케이스에서는 어떻게 결과가 나올 지 알수 없으므로 Lock 객체만으로도 성능이 제대로 나오지 않으면 SpinLock를 테스트 해보기를 권한다. 아래 "그림1"에서 결과 화면을 보도록 하자.


[그림1] SpinLock vs Lock 결과 화면

(테스트 환경 : i5 모바일 CPU, 2Core, 하이퍼스레딩, 8GB )



 그리고 Parallel.Invoke에 대해서 부연 설명을 드리자면 Task를 두개 생성하면 실행하는 구문이라고 생각하면 쉽게 이해할 수 있을 것이다. Parallel.Invoke(Task(action), Task(action))와 같은 형식이고 각 타스크가 쓰레드에서 타스크 형식으로 작업이 수행되는 것이다. 두개를 한번에 실행 하기에 잠김 상태가 발생하도록 한 것이다. 그리고 SpinLock()는 해당 CPU 주기만큼 대기 하고 실행하도록 해주는 역할을 한다. Thread.Sleep()는 시간이 비교 대상이라면 SpinLock는 CPU 싸이클이 비교대상인 것이다.


소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       


C# TaskFactory and TaskScheduler

http://msdn.microsoft.com/ko-kr/library/dd321418.aspx

http://code.msdn.microsoft.com/ParExtSamples

http://msdn.microsoft.com/ko-kr/library/dd997402.aspx

http://channel9.msdn.com/Events/TechDays/Techdays-2012-the-Netherlands/2287


 이전 포스트에서 MyTaskScheduler을 직접 만들어 보았다. 그러나 동시성 수준이 1이라서 비 동기 효율이 제대로 나오지 않는 구조적인 문제가 있었다. 실제로 테스트를 해보면 Default Scheduler 보다 훨씬 안 좋은 성능을 보여주고 있다. 그리하여 이번에는 동시성 수준을 마음대로 컨트롤 할 수 있는 LimitedConcurrencyLevelTaskScheduler을 만들어 보도록 하겠다.

/// <summary> /// TaskScheduler을 상속 받아 구현 한다. /// Provides a task scheduler that ensures a maximum concurrency level while /// running on top of the ThreadPool. /// </summary> public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler {     /// <summary>     /// 현재 쓰레드가 작업 리스트를 처리하고 있는지 여부 판단     /// </summary>     /// <remarks>     /// ThreadStaticAttribute 로 표시된 static 필드는 스레드 간에 공유되지 않습니다.     /// 각 실행 스레드에는 필드에 대한 별도의 인스턴스가 있으며 해당 필드에 대한 값을 독립적으로 설정하고 가져옵니다.     /// 필드를 서로 다른 스레드에서 액세스하면 해당 필드에는 다른 값이 들어가게 됩니다.     /// </remarks>     [ThreadStatic]     private static bool _currentThreadIsProcessingItems;     /// <summary>실행될 타스크 리스트</summary>     private readonly LinkedList<System.Threading.Tasks.Task> _tasks = new LinkedList<System.Threading.Tasks.Task>(); // protected by lock(_tasks)     /// <summary>현재 스케줄러에서 최대로 허용된 동시성 제어 수준</summary>     private readonly int _maxDegreeOfParallelism;     /// <summary>실제로 수행되고 있는 동시성 수준 숫자</summary>     private int _delegatesQueuedOrRunning = 0// protected by lock(_tasks)     /// <summary>     /// 인스턴스 초기화 진행     /// </summary>     /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>     public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)     {         if (maxDegreeOfParallelism < 1throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");         _maxDegreeOfParallelism = maxDegreeOfParallelism;     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">큐에 대기할 Task입니다.</param>     protected sealed override void QueueTask(System.Threading.Tasks.Task task)     {         // 처리할 작업 목록에 추가.         // tasks가 처리중이거나 준비가 되지 않았을 때 대기 함         lock (_tasks)         {             _tasks.AddLast(task);             // MaximumConcurrencyLevel의 숫자보다 작을 때만 실행             // 동시성 수준을 제어하는 Scheduler 클래스 이므로 이곳에서 동시성 수준을 체크한다.             if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)             {                 ++_delegatesQueuedOrRunning;                 NotifyThreadPoolOfPendingWork();             }         }     }     /// <summary>     /// 작업이 스케줄러에 대한 실행 할 필요가있는 스레드를 알려줍니다.     /// </summary>     private void NotifyThreadPoolOfPendingWork()     {         // ThreadPool에서 실행이 되도록 한다.         // 실행 주기는 ThreadPool이 CPU 환경에 맞게 동시 실행을 컨트롤 한다.         // http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool_methods(v=vs.110).aspx         ThreadPool.UnsafeQueueUserWorkItem(_ =>         {

        // 현재 스레드가 작업 항목을 활성화 하도록 함
        // 스레드에 작업 활성화를 할 수 있도록 한다.
        // ThreadStatic으로 선언되었기에 쓰레드마다 별도의 값으로 접근 한다.
        _currentThreadIsProcessingItems = true;
        try
        {
            // 대기열에 사용 가능한 모든 항목을 처리합니다.
            while (true)
            {
                System.Threading.Tasks.Task item;
                lock (_tasks)
                {
                    // 처리 할 항목이 더 있을 경우,
                    // 처리가 완료되면 루프를 나간다.
                    if (_tasks.Count == 0)
                    {
                        --_delegatesQueuedOrRunning;
                        break;
                    }
 
                    // 큐에서 다음 항목을 가져 오기.
                    item = _tasks.First.Value;
                    _tasks.RemoveFirst();
                }
 
                // 큐에서 찾아낸 작업을 실행
                base.TryExecuteTask(item);
            }
        }
        // 현재 스레드에서 처리 항목을 완료
        finally { _currentThreadIsProcessingItems = false; }
                    }, null);     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">실행할 타스크</param>     /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>     /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns> /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>     protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)     {         //쓰레드에서 처리가 되고 있으면 별도 실해을 지정하지 않는다.         //중복 실행이 되지 않도록 해야 한다.         if (!_currentThreadIsProcessingItemsreturn false;         // 작업이 이전에 큐에 대기된 것이면 제거         if (taskWasPreviouslyQueuedTryDequeue(task);         // 한번더 실행을 시도 한다.         return base.TryExecuteTask(task);     }     /// <summary>이전에 이 스케줄러의 큐에 대기된 Task를 큐에서 제거하려고 합니다</summary>     /// <param name="task">큐에서 제거할 Task입니다.</param>     /// <returns>task 인수가 큐에서 제거되었는지 여부를 나타내는 부울입니다.</returns>     protected sealed override bool TryDequeue(System.Threading.Tasks.Task task)     {         lock (_tasksreturn _tasks.Remove(task);     }     /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>     public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }     /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>     /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>     protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()     {         bool lockTaken = false;         try         {             Monitor.TryEnter(_tasksref lockTaken);             if (lockTakenreturn _tasks.ToArray();             else throw new NotSupportedException();         }         finally         {             if (lockTakenMonitor.Exit(_tasks);         }     } }

[코드1] LimitedConcurrencyLevelTaskScheduler 전체 코드


 위와 같이 동시성 수준을 제어 할 수 있는 스케줄러를 만들어 보았다. 처음 인스턴스를 시킬때 넣는 동시성 수준 갯수를 세팅하면 한번에 실행되는 타스크의 갯수를 제어할 수가 있다. 전체적인 흐름은 QueueTask를 통해 넘어온 Task를 곧바로 실행하지 않고 Queue에 넣어 둔다. 동시성 수준을 통과한 상태에서 NotifyThreadPoolOfPendingWork에서 ThreadPool에서 각각의 Task를 실행이 활성화 되도록 한다. 그렇다면 이제 한번 실행하여 결과를 보도록 하자.


/// <summary>
/// LimitedConcurrencyLevelTaskScheduler로 테스트
/// </summary>
public void LimitedConcurrencyLevelTaskScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5);
 
    // 커스터마이징 된 LimitedConcurrencyLevelTaskScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(limitedScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    for (int j = 1j <= 20000j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 5i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds + "ms");
}

[코드2] LimitedConcurrencyLevelTaskScheduler 테스트 코드



[그림1] LimitedConcurrencyLeveTaskScheduler 실행 결과 화면


 "그림1"에서와 같이 여러 쓰레드 ID에서 각각의 Task가 수행이 된 것을 확인할 수 있다. 그렇지만 이 작업은 Default Scheduler 보다 작업 시간이 길게 걸린다. 뭔가가 문제가 있는 것일까? 예상 기대치 보다 좋지 않다란 생각을 하게 되었다. 그래서 Default Scheduler의 기본 MaximumConcurrencyLevel을 확인해보니 2147483647로 확인이 되었다. 내가 세팅한 값보다 엄청 많은 동시성 수준이다. 그리고 기본적으로 ThreadPool을 통해서 수행하다 보니 CPU Core의 절대적인 숫자에 제한을 받는다. ( ThreadPool Click new ) 그러므로 아무리 동시성 수준을 높여도 Core 갯수 이상은 동시 실행이 되지 않는다. 이 요건은 처음 포스트 당시 언급 했던 대기 시간이 많은 수행에 대해서 특별한 Scheduler를 만들려고 하는 계획과는 차이가 있게 되었다.  그래서 ThreadPool 대신이 Thread를 통해서 실행이 되도록 하였으며 프로퍼티를 통해 Thread와 ThreadPool을 선택하여 수행 할 수 있도록 수정 하게 되었다. "코드3"를 확인해 보자


/// <summary>
/// TaskScheduler을 상속 받아 구현 한다.
/// Provides a task scheduler that ensures a maximum concurrency level while
/// running on top of the ThreadPool.
/// </summary>
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
    /// <summary>
    /// 현재 쓰레드가 작업 리스트를 처리하고 있는지 여부 판단
    /// </summary>
    /// <remarks>
    /// ThreadStaticAttribute 로 표시된 static 필드는 스레드 간에 공유되지 않습니다.
    /// 각 실행 스레드에는 필드에 대한 별도의 인스턴스가 있으며 해당 필드에 대한 값을 독립적으로 설정하고 가져옵니다.
    /// 필드를 서로 다른 스레드에서 액세스하면 해당 필드에는 다른 값이 들어가게 됩니다.
    /// </remarks>
    [ThreadStatic]
    private static bool _currentThreadIsProcessingItems;
    /// <summary>실행될 타스크 리스트</summary>
    private readonly LinkedList<System.Threading.Tasks.Task> _tasks = new LinkedList<System.Threading.Tasks.Task>(); // protected by lock(_tasks)
    /// <summary>현재 스케줄러에서 최대로 허용된 동시성 제어 수준</summary>
    private readonly int _maxDegreeOfParallelism;
    /// <summary>실제로 수행되고 있는 동시성 수준 숫자</summary>
    private int _delegatesQueuedOrRunning = 0// protected by lock(_tasks)
 
    /// <summary>
    /// Task 활성화 실행 타입을 설정 (ThreadPool이 기본 값)
    /// </summary>
    private LimitedConcurrencyLevelTaskExecuteType ExecuteType = LimitedConcurrencyLevelTaskExecuteType.ThreadPool;
 
    /// <summary>
    /// 인스턴스 초기화 진행
    /// </summary>
    /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>
    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism < 1throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
    }

    /// <summary>
    /// 인스턴스 초기화 진행
    /// </summary>
    /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>
    /// <param name="ExecuteType">Task수행 활성화 타입</param>
    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelismLimitedConcurr

ncyLevelTaskExecuteType ExecuteType)         : this(maxDegreeOfParallelism)     {         this.ExecuteType = ExecuteType;     }

    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">큐에 대기할 Task입니다.</param>     protected sealed override void QueueTask(System.Threading.Tasks.Task task)     {         // 처리할 작업 목록에 추가.         // tasks가 처리중이거나 준비가 되지 않았을 때 대기 함         lock (_tasks)         {             _tasks.AddLast(task);             // MaximumConcurrencyLevel의 숫자보다 작을 때만 실행             // 동시성 수준을 제어하는 Scheduler 클래스 이므로 이곳에서 동시성 수준을 체크한다.             if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)             {                 ++_delegatesQueuedOrRunning;                 NotifyThreadPoolOfPendingWork();             }         }     }     /// <summary>     /// 작업이 스케줄러에 대한 실행 할 필요가있는 스레드를 알려줍니다.     /// </summary>     private void NotifyThreadPoolOfPendingWork()     {         switch (ExecuteType)         {             case LimitedConcurrencyLevelTaskExecuteType.Thread:                 #region Thread로 실행                 // 쓰레드에서 Task가 실행이 되도록 한다.                 Thread thread = new Thread(new ThreadStart(() =>                 {                     NotifyThreadPoolOfPendingWorking();                 }));                 thread.Start();                 #endregion                 break;             case LimitedConcurrencyLevelTaskExecuteType.ThreadPool:                 #region ThreadPool.UnsafeQueueUserWorkItem로 실행                 // ThreadPool에서 실행이 되도록 한다.                 // 실행 주기는 ThreadPool이 CPU 환경에 맞게 동시 실행을 컨트롤 한다.                 // http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool_methods(v=vs.110).aspx                 ThreadPool.UnsafeQueueUserWorkItem(_ =>                 {                     NotifyThreadPoolOfPendingWorking();                 }, null);                 #endregion                 break;         }     }     /// <summary>     /// 작업 실행을 시작 하도록 합니다.     /// </summary>     /// <remarks>     /// 실행을 Thread로 할것이지 ThreadPool에서 실행할 것인지 테스트를 위해 만듬.     /// </remarks>     private void NotifyThreadPoolOfPendingWorking()     {         // 현재 스레드가 작업 항목을 활성화 하도록 함         // 스레드에 작업 활성화를 할 수 있도록 한다.         // ThreadStatic으로 선언되었기에 쓰레드마다 별도의 값으로 접근 한다.         _currentThreadIsProcessingItems = true;         try         {             // 대기열에 사용 가능한 모든 항목을 처리합니다.             while (true)             {                 System.Threading.Tasks.Task item;                 lock (_tasks)                 {                     // 처리 할 항목이 더 있을 경우,                     // 처리가 완료되면 루프를 나간다.                     if (_tasks.Count == 0)                     {                         --_delegatesQueuedOrRunning;                         break;                     }                     // 큐에서 다음 항목을 가져 오기.                     item = _tasks.First.Value;                     _tasks.RemoveFirst();                 }                 // 큐에서 찾아낸 작업을 실행                 base.TryExecuteTask(item);             }         }         // 현재 스레드에서 처리 항목을 완료         finally { _currentThreadIsProcessingItems = false; }     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">실행할 타스크</param>     /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>     /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns> /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>     protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)     {         //쓰레드에서 처리가 되고 있으면 별도 실해을 지정하지 않는다.         //중복 실행이 되지 않도록 해야 한다.         if (!_currentThreadIsProcessingItemsreturn false;         // 작업이 이전에 큐에 대기된 것이면 제거         if (taskWasPreviouslyQueuedTryDequeue(task);         // 한번더 실행을 시도 한다.         return base.TryExecuteTask(task);     }     /// <summary>이전에 이 스케줄러의 큐에 대기된 Task를 큐에서 제거하려고 합니다</summary>     /// <param name="task">큐에서 제거할 Task입니다.</param>     /// <returns>task 인수가 큐에서 제거되었는지 여부를 나타내는 부울입니다.</returns>     protected sealed override bool TryDequeue(System.Threading.Tasks.Task task)     {         lock (_tasksreturn _tasks.Remove(task);     }     /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>     public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }     /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>     /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>     protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()     {         bool lockTaken = false;         try         {             Monitor.TryEnter(_tasksref lockTaken);             if (lockTakenreturn _tasks.ToArray();             else throw new NotSupportedException();         }         finally         {             if (lockTakenMonitor.Exit(_tasks);         }     } } /// <summary> /// 절대적인 우위를 보장하는 방법이 아니므로 두개 타입을 비교해서 항상 최선의 방법을 찾도로 해야 한다. /// </summary> public enum LimitedConcurrencyLevelTaskExecuteType {     /// <summary>     /// Thread로 Task를 실행 하도록 함.     /// ( CPU 경합이 적고, 대기 시간이 긴 로직에서 보다 많은 동시 수행을 진행 하도록 함 )     /// 절대적인 우위를 보장하는 방법이 아니므로 두개 타입을 비교해서 항상 최선의 방법을 찾도로 해야 한다.     /// </summary>     Thread,     /// <summary>     /// ThreadPool에서 Task를 실행 하도록 함.     /// </summary>     ThreadPool }

[코드3] Thread와 ThreadPool을 선택적으로 수행 하도록 수정한 코드


 위와 같이 소스를 수정하게 되었더니 동시 실행되는 Task가 Core 갯수의 제한 보다는 동시성 수준에 맞게 수행이 되었다. 하지만 일반적인 시나리오에서는 기본 스케줄러를 이용해 수행이 더 효율적이며 LimitedConcurrencyLevelTaskSchedler은 Cpu 경합 보다는 I/O 작업과 같은 대기 시간이 특히 오래 걸리는 작업에 대해 충분히 테스트를 거처 실 업무에 적용해 봐야 할 것이다. 이 코드는 모든 환경에서 우수한 성능으로 수행할 것이라고 보장하지 않는다. 이제 "코드4"을 통해 확인해 보자

/// <summary>
/// LimitedConcurrencyLevelTaskScheduler로 테스트
/// </summary>
public void LimitedConcurrencyLevelTaskScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    //var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5, LimitedConcurrencyLevelTaskExecuteType.Thread);
    var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5);
    limitedScheduler.ExecuteType = LimitedConcurrencyLevelTaskExecuteType.Thread;
 
    // 커스터마이징 된 LimitedConcurrencyLevelTaskScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(limitedScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    for (int j = 1j <= 20000j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 5i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds + "ms");
}

[코드4] LimitedConcurrencyLevelTaskScheduler 테스트 코드


 지금까지 TaskScheduler을 커스터마이징 하는것에 대해서 알아 보았다. 그렇지만 모두 이해를 하는데에는 많이 모자랄 것이라 예상한다. 그건 아마도 필자의 실력이 아직 미진하여 정답을 제대로 알려 줄 수 있는 역량이 높지 않기 때문이리라 생각 한다. 그래서 좀더 깊이 있고 많이 알고 싶으신 분은 MS에서 제공하는 예제를 좀더 공부해 보면 어떨까 합니다. http://code.msdn.microsoft.com/windowsdesktop/Samples-for-Parallel-b4b76364

이 페이지의 샘플 소스를 받아 분석하고 자신의 자산으로 만들 수 있는 기회를 가졌으면 합니다. 지금까지 읽어 주셔서 감사합니다. 꼭 당부를 드리자면 직접 실행해 보고 디버깅을 해 보는 것이 눈으로 확인하는 것 보다 더 많은 기회와 가치를 제공해 준다는 것을 알려 드립니다.


소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       



C# TaskFactory and TaskScheduler

http://msdn.microsoft.com/ko-kr/library/dd321418.aspx

http://code.msdn.microsoft.com/ParExtSamples

http://msdn.microsoft.com/ko-kr/library/dd997402.aspx

http://channel9.msdn.com/Events/TechDays/Techdays-2012-the-Netherlands/2287


 이전 포스트에서 "new TaskFactory(new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);"와 같은 형식으로 TaskScheduler를 배타적 실행이 되도록 하여 Task.Yield()를 테스트를 하였다. ( 2013/03/11 - [.Net Framework] - [C# Snippet]Task-based Asynchronous Pattern Task.Yield ) 이번에는 이 작업 스케줄러에 대해서 커스터마이징을 하는 방법에 대해서 알아 보고 Task 수행을 마음대로 요리할 수 있는 방법에 대해서 알아 보고자 한다.


 기본 TaskScheduler은 CPU Core의 갯수에 맞춰서 동시 실행이 되도록 되어 있습니다. 이 스케줄러는 일반 적인 케이스(I/O, CPU, Network, ...)에서 노말하게 동시 실행이 되도록 배분을 하고 있다. 그렇지만 Network 대기 시간같은 응답 대기에서는 경합에 의한 CPU 손실보다는  보다 많은 실행을 동시에 수행하는 것이 더 중요할 때가 있다.  보다 많은 동시 실행을 요구하는 시나리오에서 기본 스케줄러가 이닌 커스터마이징 된 스케줄러를 통해서 동시 실행 갯수를 통제 할 수 있을 것이다. ( ThreadPool.QueueUserWorkItem(new WaitCallback(_ => { })); 와 같은 비슷한 패턴으로 TaskFactory로 생성된 타스크가 수행된다. 하지만 경합 관리에서는 다른 패턴으로 관리가 되고 있다. http://www.codethinked.com/net-40-and-systemthreadingtasks)


 참고로 ThreadPool과 TaskPool의 스케줄링에 의한 실행 결과를 "그림1"을 통해 확인해 보자

[그림1] Thread and Task 실행 비교

http://www.codethinked.com/net-40-and-systemthreadingtasks

( Thread는 ThreadPool.QueueUserWorkItem을 통해 실행 됨 )


 이와 같이 스케줄링에 의해서 총 실행 결과에 영향을 미칠 수가 있다. 이제 본격적으로 TaskScheduler을 좀더 파헤처 보도록 하자. "코드1"에서는 기본적으로 제공하는 스케줄러를 가져오는 방법이다.

TaskScheduler scheduler = TaskScheduler.Default;

[코드1] 기본 제공되는 Scheduler


 이 스케줄러는 Task.Factory와 같은 형식으로 스케줄리이 되는 기본 스케줄러다. 이제는 MyTaskScheduler를 만들어 수행 순서를 마음대로 조절하는 Scheduler를 만들어 볼 것이다.

/// <summary>
/// TaskScheduler을 상속 받아 구현 한다.
/// </summary>
public class MyTaskScheduler : TaskScheduler
{
    protected override System.Collections.Generic.IEnumerable<Task> GetScheduledTasks()
    {
        throw new NotImplementedException();
    }
 
    protected override void QueueTask(Task task)
    {
        throw new NotImplementedException();
    }
 
    /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>
    protected override bool TryExecuteTaskInline(Task taskbool taskWasPreviouslyQueued)
    {
        throw new NotImplementedException();
    }
}

[코드2] TaskScheduler abstract class를 상속 코드


 "코드2"는 추상 TaskScheduler을 상속받은 모습이다. 이제 해당 메소드를 하나씩 채워 나가면 나만의 Scheduler이 만들어 지는 것이다. 생각보다 간단해 보이지 않는가? 이제 간단하게 완성된 MyTaskScheduler를 살펴 보자.

public class MyTaskScheduler : TaskScheduler
{
    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>
    /// <param name="task">큐에 대기할 Task입니다.</param>
    protected override void QueueTask(System.Threading.Tasks.Task task)
    {
        TryExecuteTask(task);
    }
 
    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>
    /// <param name="task">실행할 타스크</param>
    /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>
    /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns>
    /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>
    protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)
    {
        return TryExecuteTask(task);
    }
 
    /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>
    /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>
    protected override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()
    {
        return Enumerable.Empty<System.Threading.Tasks.Task>();
    }
 
    /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>
    public override int MaximumConcurrencyLevel { get { return 1; } }
}

[코드3] MyTaskScheduler


 MyTaskScheduler는 QueueTask로 들어온 Task를 TryExecuteTask를 통해서 바로 실행이 되도록 하였기에 TryExecutTaskInline로 호출이 되지 않도록 되어 있다. 만약 여기에서 실행이 되지 않으면 TryExecuteTaskInline로 다시 시도하게 된다. 지금 구현된 MyTaskScheduler은 기본적으로 동시 실행이 1개만 되도록 설계 및 구현 되었으며 다음에 좀더 확장하여 살펴 보도록 하겠다. 참고로 TaskSchedler.Default의 MaximumConcurrencyLevel은 2147483647이다. 이제 "코드4"를 통해서 실행하는 코드를 살펴 보자

/// <summary>
/// 커스터 마이징된 MyScheduler로 테스트
/// </summary>
public void MyScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    TaskScheduler myScheduler = new MyTaskScheduler();
    // 기본 스케줄러
    // myScheduler = TaskScheduler.Default;
    // 동시 실행 갯수 확인
    // Console.WriteLine(myScheduler.MaximumConcurrencyLevel);
 
    // 커스터마이징 된 MyScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(myScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    // 동시 실행이 되도록 수행
    for (int j = 1j <= 200j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 500i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds);
}

[코드4] MyScheduler 테스트 코드


 위 코드를 통해 결과 화면을 "그림1"에서 확인해 보자.


[그림1] MyTaskScheduler 방식으로 실행 결과 화면


 위 결과를 보면 여러 타스크가 실행이 되었지만 하나의 쓰레드 ID에서 실행이 된 것을 확인 할 수 있다. 지금까지 간단하가 MyScheduler은 TaskScheduler에서 제공하는 기본만을 가지고 기본 개념을 이해하기 위해 연습을 한 것이며 다음 포스트에서 좀더 디테일한 작업을 진행해 보도록 하겠다.



소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       


C# SemaphoreSlim


2013/02/27 - [.Net Framework] - [Threading] Semaphore(세마포어) - C#



 지난 포스트에서 소개 했던 세마포어(Seamphore) 대신에 Windows 커널 세마포어를 사용하지 않는 간단한 클래스이다. 사용목적은 세마포어와 같으며 사용방법은 다음 코드를 보자

//한번에 허용할 수 있는 최대 쓰레드 수
private static int _maximumThreads = 3;
 
/// <summary>
/// SemaphoreSlim으로 동시 쓰레드 갯수 제한 테스트
/// </summary>
[TestMethod]
public void SemaphoreSlim_TestMethod()
{
    SemaphoreSlim ss = new SemaphoreSlim(0_maximumThreads);
 
    for (int i = 0i <= 5i++)
    {
        Thread thread = new Thread(new ThreadStart(() =>
        {
            ss.Wait();
            Thread.Sleep(100);
            Debug.WriteLine("{0} 실행 됨."Thread.CurrentThread.Name);
            ss.Release();
        }));
 
        thread.Name = String.Concat("Thread "i + 1);
        thread.Start();
    }
 
 
    Thread.Sleep(300);
    ss.Release(_maximumThreads);
 
    ss.AvailableWaitHandle.WaitOne();
    Debug.WriteLine("ss.CurrentCount after ss.Wait() = {0}"ss.CurrentCount);
}

[코드1] SemaphoreSlim으로 동시 쓰레드 실행 제한


 위 코드의 자세한 사항은 바로 이전 포스트를 보면 알 수 있을 것이다.





Semaphore(세마포어)


 세마포어는 동시에 엑세스할 수 있는 쓰레드 풀을 제한합니다. 다중 쓰레드에서 하나의 자원에 접근할 때 세마포어는 허용된 Concurrent 갯수 이상은 대기 토록하고 처리가 완료가 되면 완료된 쓰레드 갯수만큼 다음 쓰레드가 실행이 되도록 할 수 있습니다. 아래 "코드1"을 보시기 바랍니다.

//세파포어 선언 private static Semaphore _resourcePool; //한번에 허용할 수 있는 최대 쓰레드 수 private static int _maximumThreads = 3; [TestMethod] public void Semaphore_TestMethod() {     //세마포어를 할당한다.     //초기 실행 가능한 쓰레드는 0     //최대 실행 가능한 쓰레드는 3     _resourcePool = new Semaphore(0_maximumThreads);     for (int i = 0i < 8i++)     {         //쓰레드 할당         Thread thread = new Thread(Worker);         thread.Name = String.Concat("Thread "i + 1);         //쓰레드 실행         thread.Start();     }     //세마포어를 통해 대기되고 있던 쓰레드가 허용됫 갯수의 쓰레드가 수행이 되도록 한다.     _resourcePool.Release(_maximumThreads);             } /// <summary> /// 비동기 임의의 작업(3초 대기) /// </summary> private static void Worker() {     //진입된 쓰레드가 대기 하도록 한다.     _resourcePool.WaitOne();     Console.WriteLine("{0} Enters"Thread.CurrentThread.Name);     Debug.WriteLine("{0} Enters"Thread.CurrentThread.Name);     Thread.Sleep(3000);     Console.WriteLine("{0} Exits"Thread.CurrentThread.Name);     Debug.WriteLine("{0} Exits"Thread.CurrentThread.Name);     //진입된 쓰레드가 완료가 되었다고 알려준다.     _resourcePool.Release(); }

[코드1] 세마포어를 통해 동시 쓰레드 실행 갯수 제한


 위 코드에서 보는 바와 같이 최대 실행 갯수(_maximumThreads)를 3으로 세팅되었고, new Semaphore(0, _maximunThreads); 로 선언하였다. 이렇게 초기 세팅이 되면 Worker()안에서 WaitOne할 때 수행되는 쓰레드는 모두 대기 상태가 된다. 그렇지만 for문 마지막에 _resourcePool.Release(_maximunThreads);를 통해서 모두 실행 가능 하도록 하여 Wowker()의 WaitOne에서 대기하고 있던 쓰레드 3개가 수행이 되는 것이다. 이 세마포어를 통해 동시성 제어를 할 수 있다.


 Semaphore class는 커널 모드 객체를 통해 동작하기 때문에 lock, Monitor, Interlocked 보다 무거운 면이 있다 그렇지만 다음에 소개하는 SemaphoreSlim을 사용하면 가볍게 같은 방식으로 수행하도록 제어할 수 있다.

+ Recent posts