C# TaskFactory and TaskScheduler

http://msdn.microsoft.com/ko-kr/library/dd321418.aspx

http://code.msdn.microsoft.com/ParExtSamples

http://msdn.microsoft.com/ko-kr/library/dd997402.aspx

http://channel9.msdn.com/Events/TechDays/Techdays-2012-the-Netherlands/2287


 이전 포스트에서 MyTaskScheduler을 직접 만들어 보았다. 그러나 동시성 수준이 1이라서 비 동기 효율이 제대로 나오지 않는 구조적인 문제가 있었다. 실제로 테스트를 해보면 Default Scheduler 보다 훨씬 안 좋은 성능을 보여주고 있다. 그리하여 이번에는 동시성 수준을 마음대로 컨트롤 할 수 있는 LimitedConcurrencyLevelTaskScheduler을 만들어 보도록 하겠다.

/// <summary> /// TaskScheduler을 상속 받아 구현 한다. /// Provides a task scheduler that ensures a maximum concurrency level while /// running on top of the ThreadPool. /// </summary> public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler {     /// <summary>     /// 현재 쓰레드가 작업 리스트를 처리하고 있는지 여부 판단     /// </summary>     /// <remarks>     /// ThreadStaticAttribute 로 표시된 static 필드는 스레드 간에 공유되지 않습니다.     /// 각 실행 스레드에는 필드에 대한 별도의 인스턴스가 있으며 해당 필드에 대한 값을 독립적으로 설정하고 가져옵니다.     /// 필드를 서로 다른 스레드에서 액세스하면 해당 필드에는 다른 값이 들어가게 됩니다.     /// </remarks>     [ThreadStatic]     private static bool _currentThreadIsProcessingItems;     /// <summary>실행될 타스크 리스트</summary>     private readonly LinkedList<System.Threading.Tasks.Task> _tasks = new LinkedList<System.Threading.Tasks.Task>(); // protected by lock(_tasks)     /// <summary>현재 스케줄러에서 최대로 허용된 동시성 제어 수준</summary>     private readonly int _maxDegreeOfParallelism;     /// <summary>실제로 수행되고 있는 동시성 수준 숫자</summary>     private int _delegatesQueuedOrRunning = 0// protected by lock(_tasks)     /// <summary>     /// 인스턴스 초기화 진행     /// </summary>     /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>     public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)     {         if (maxDegreeOfParallelism < 1throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");         _maxDegreeOfParallelism = maxDegreeOfParallelism;     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">큐에 대기할 Task입니다.</param>     protected sealed override void QueueTask(System.Threading.Tasks.Task task)     {         // 처리할 작업 목록에 추가.         // tasks가 처리중이거나 준비가 되지 않았을 때 대기 함         lock (_tasks)         {             _tasks.AddLast(task);             // MaximumConcurrencyLevel의 숫자보다 작을 때만 실행             // 동시성 수준을 제어하는 Scheduler 클래스 이므로 이곳에서 동시성 수준을 체크한다.             if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)             {                 ++_delegatesQueuedOrRunning;                 NotifyThreadPoolOfPendingWork();             }         }     }     /// <summary>     /// 작업이 스케줄러에 대한 실행 할 필요가있는 스레드를 알려줍니다.     /// </summary>     private void NotifyThreadPoolOfPendingWork()     {         // ThreadPool에서 실행이 되도록 한다.         // 실행 주기는 ThreadPool이 CPU 환경에 맞게 동시 실행을 컨트롤 한다.         // http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool_methods(v=vs.110).aspx         ThreadPool.UnsafeQueueUserWorkItem(_ =>         {

        // 현재 스레드가 작업 항목을 활성화 하도록 함
        // 스레드에 작업 활성화를 할 수 있도록 한다.
        // ThreadStatic으로 선언되었기에 쓰레드마다 별도의 값으로 접근 한다.
        _currentThreadIsProcessingItems = true;
        try
        {
            // 대기열에 사용 가능한 모든 항목을 처리합니다.
            while (true)
            {
                System.Threading.Tasks.Task item;
                lock (_tasks)
                {
                    // 처리 할 항목이 더 있을 경우,
                    // 처리가 완료되면 루프를 나간다.
                    if (_tasks.Count == 0)
                    {
                        --_delegatesQueuedOrRunning;
                        break;
                    }
 
                    // 큐에서 다음 항목을 가져 오기.
                    item = _tasks.First.Value;
                    _tasks.RemoveFirst();
                }
 
                // 큐에서 찾아낸 작업을 실행
                base.TryExecuteTask(item);
            }
        }
        // 현재 스레드에서 처리 항목을 완료
        finally { _currentThreadIsProcessingItems = false; }
                    }, null);     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">실행할 타스크</param>     /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>     /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns> /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>     protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)     {         //쓰레드에서 처리가 되고 있으면 별도 실해을 지정하지 않는다.         //중복 실행이 되지 않도록 해야 한다.         if (!_currentThreadIsProcessingItemsreturn false;         // 작업이 이전에 큐에 대기된 것이면 제거         if (taskWasPreviouslyQueuedTryDequeue(task);         // 한번더 실행을 시도 한다.         return base.TryExecuteTask(task);     }     /// <summary>이전에 이 스케줄러의 큐에 대기된 Task를 큐에서 제거하려고 합니다</summary>     /// <param name="task">큐에서 제거할 Task입니다.</param>     /// <returns>task 인수가 큐에서 제거되었는지 여부를 나타내는 부울입니다.</returns>     protected sealed override bool TryDequeue(System.Threading.Tasks.Task task)     {         lock (_tasksreturn _tasks.Remove(task);     }     /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>     public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }     /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>     /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>     protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()     {         bool lockTaken = false;         try         {             Monitor.TryEnter(_tasksref lockTaken);             if (lockTakenreturn _tasks.ToArray();             else throw new NotSupportedException();         }         finally         {             if (lockTakenMonitor.Exit(_tasks);         }     } }

[코드1] LimitedConcurrencyLevelTaskScheduler 전체 코드


 위와 같이 동시성 수준을 제어 할 수 있는 스케줄러를 만들어 보았다. 처음 인스턴스를 시킬때 넣는 동시성 수준 갯수를 세팅하면 한번에 실행되는 타스크의 갯수를 제어할 수가 있다. 전체적인 흐름은 QueueTask를 통해 넘어온 Task를 곧바로 실행하지 않고 Queue에 넣어 둔다. 동시성 수준을 통과한 상태에서 NotifyThreadPoolOfPendingWork에서 ThreadPool에서 각각의 Task를 실행이 활성화 되도록 한다. 그렇다면 이제 한번 실행하여 결과를 보도록 하자.


/// <summary>
/// LimitedConcurrencyLevelTaskScheduler로 테스트
/// </summary>
public void LimitedConcurrencyLevelTaskScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5);
 
    // 커스터마이징 된 LimitedConcurrencyLevelTaskScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(limitedScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    for (int j = 1j <= 20000j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 5i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds + "ms");
}

[코드2] LimitedConcurrencyLevelTaskScheduler 테스트 코드



[그림1] LimitedConcurrencyLeveTaskScheduler 실행 결과 화면


 "그림1"에서와 같이 여러 쓰레드 ID에서 각각의 Task가 수행이 된 것을 확인할 수 있다. 그렇지만 이 작업은 Default Scheduler 보다 작업 시간이 길게 걸린다. 뭔가가 문제가 있는 것일까? 예상 기대치 보다 좋지 않다란 생각을 하게 되었다. 그래서 Default Scheduler의 기본 MaximumConcurrencyLevel을 확인해보니 2147483647로 확인이 되었다. 내가 세팅한 값보다 엄청 많은 동시성 수준이다. 그리고 기본적으로 ThreadPool을 통해서 수행하다 보니 CPU Core의 절대적인 숫자에 제한을 받는다. ( ThreadPool Click new ) 그러므로 아무리 동시성 수준을 높여도 Core 갯수 이상은 동시 실행이 되지 않는다. 이 요건은 처음 포스트 당시 언급 했던 대기 시간이 많은 수행에 대해서 특별한 Scheduler를 만들려고 하는 계획과는 차이가 있게 되었다.  그래서 ThreadPool 대신이 Thread를 통해서 실행이 되도록 하였으며 프로퍼티를 통해 Thread와 ThreadPool을 선택하여 수행 할 수 있도록 수정 하게 되었다. "코드3"를 확인해 보자


/// <summary>
/// TaskScheduler을 상속 받아 구현 한다.
/// Provides a task scheduler that ensures a maximum concurrency level while
/// running on top of the ThreadPool.
/// </summary>
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
    /// <summary>
    /// 현재 쓰레드가 작업 리스트를 처리하고 있는지 여부 판단
    /// </summary>
    /// <remarks>
    /// ThreadStaticAttribute 로 표시된 static 필드는 스레드 간에 공유되지 않습니다.
    /// 각 실행 스레드에는 필드에 대한 별도의 인스턴스가 있으며 해당 필드에 대한 값을 독립적으로 설정하고 가져옵니다.
    /// 필드를 서로 다른 스레드에서 액세스하면 해당 필드에는 다른 값이 들어가게 됩니다.
    /// </remarks>
    [ThreadStatic]
    private static bool _currentThreadIsProcessingItems;
    /// <summary>실행될 타스크 리스트</summary>
    private readonly LinkedList<System.Threading.Tasks.Task> _tasks = new LinkedList<System.Threading.Tasks.Task>(); // protected by lock(_tasks)
    /// <summary>현재 스케줄러에서 최대로 허용된 동시성 제어 수준</summary>
    private readonly int _maxDegreeOfParallelism;
    /// <summary>실제로 수행되고 있는 동시성 수준 숫자</summary>
    private int _delegatesQueuedOrRunning = 0// protected by lock(_tasks)
 
    /// <summary>
    /// Task 활성화 실행 타입을 설정 (ThreadPool이 기본 값)
    /// </summary>
    private LimitedConcurrencyLevelTaskExecuteType ExecuteType = LimitedConcurrencyLevelTaskExecuteType.ThreadPool;
 
    /// <summary>
    /// 인스턴스 초기화 진행
    /// </summary>
    /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>
    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism < 1throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
    }

    /// <summary>
    /// 인스턴스 초기화 진행
    /// </summary>
    /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>
    /// <param name="ExecuteType">Task수행 활성화 타입</param>
    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelismLimitedConcurr

ncyLevelTaskExecuteType ExecuteType)         : this(maxDegreeOfParallelism)     {         this.ExecuteType = ExecuteType;     }

    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">큐에 대기할 Task입니다.</param>     protected sealed override void QueueTask(System.Threading.Tasks.Task task)     {         // 처리할 작업 목록에 추가.         // tasks가 처리중이거나 준비가 되지 않았을 때 대기 함         lock (_tasks)         {             _tasks.AddLast(task);             // MaximumConcurrencyLevel의 숫자보다 작을 때만 실행             // 동시성 수준을 제어하는 Scheduler 클래스 이므로 이곳에서 동시성 수준을 체크한다.             if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)             {                 ++_delegatesQueuedOrRunning;                 NotifyThreadPoolOfPendingWork();             }         }     }     /// <summary>     /// 작업이 스케줄러에 대한 실행 할 필요가있는 스레드를 알려줍니다.     /// </summary>     private void NotifyThreadPoolOfPendingWork()     {         switch (ExecuteType)         {             case LimitedConcurrencyLevelTaskExecuteType.Thread:                 #region Thread로 실행                 // 쓰레드에서 Task가 실행이 되도록 한다.                 Thread thread = new Thread(new ThreadStart(() =>                 {                     NotifyThreadPoolOfPendingWorking();                 }));                 thread.Start();                 #endregion                 break;             case LimitedConcurrencyLevelTaskExecuteType.ThreadPool:                 #region ThreadPool.UnsafeQueueUserWorkItem로 실행                 // ThreadPool에서 실행이 되도록 한다.                 // 실행 주기는 ThreadPool이 CPU 환경에 맞게 동시 실행을 컨트롤 한다.                 // http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool_methods(v=vs.110).aspx                 ThreadPool.UnsafeQueueUserWorkItem(_ =>                 {                     NotifyThreadPoolOfPendingWorking();                 }, null);                 #endregion                 break;         }     }     /// <summary>     /// 작업 실행을 시작 하도록 합니다.     /// </summary>     /// <remarks>     /// 실행을 Thread로 할것이지 ThreadPool에서 실행할 것인지 테스트를 위해 만듬.     /// </remarks>     private void NotifyThreadPoolOfPendingWorking()     {         // 현재 스레드가 작업 항목을 활성화 하도록 함         // 스레드에 작업 활성화를 할 수 있도록 한다.         // ThreadStatic으로 선언되었기에 쓰레드마다 별도의 값으로 접근 한다.         _currentThreadIsProcessingItems = true;         try         {             // 대기열에 사용 가능한 모든 항목을 처리합니다.             while (true)             {                 System.Threading.Tasks.Task item;                 lock (_tasks)                 {                     // 처리 할 항목이 더 있을 경우,                     // 처리가 완료되면 루프를 나간다.                     if (_tasks.Count == 0)                     {                         --_delegatesQueuedOrRunning;                         break;                     }                     // 큐에서 다음 항목을 가져 오기.                     item = _tasks.First.Value;                     _tasks.RemoveFirst();                 }                 // 큐에서 찾아낸 작업을 실행                 base.TryExecuteTask(item);             }         }         // 현재 스레드에서 처리 항목을 완료         finally { _currentThreadIsProcessingItems = false; }     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">실행할 타스크</param>     /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>     /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns> /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>     protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)     {         //쓰레드에서 처리가 되고 있으면 별도 실해을 지정하지 않는다.         //중복 실행이 되지 않도록 해야 한다.         if (!_currentThreadIsProcessingItemsreturn false;         // 작업이 이전에 큐에 대기된 것이면 제거         if (taskWasPreviouslyQueuedTryDequeue(task);         // 한번더 실행을 시도 한다.         return base.TryExecuteTask(task);     }     /// <summary>이전에 이 스케줄러의 큐에 대기된 Task를 큐에서 제거하려고 합니다</summary>     /// <param name="task">큐에서 제거할 Task입니다.</param>     /// <returns>task 인수가 큐에서 제거되었는지 여부를 나타내는 부울입니다.</returns>     protected sealed override bool TryDequeue(System.Threading.Tasks.Task task)     {         lock (_tasksreturn _tasks.Remove(task);     }     /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>     public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }     /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>     /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>     protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()     {         bool lockTaken = false;         try         {             Monitor.TryEnter(_tasksref lockTaken);             if (lockTakenreturn _tasks.ToArray();             else throw new NotSupportedException();         }         finally         {             if (lockTakenMonitor.Exit(_tasks);         }     } } /// <summary> /// 절대적인 우위를 보장하는 방법이 아니므로 두개 타입을 비교해서 항상 최선의 방법을 찾도로 해야 한다. /// </summary> public enum LimitedConcurrencyLevelTaskExecuteType {     /// <summary>     /// Thread로 Task를 실행 하도록 함.     /// ( CPU 경합이 적고, 대기 시간이 긴 로직에서 보다 많은 동시 수행을 진행 하도록 함 )     /// 절대적인 우위를 보장하는 방법이 아니므로 두개 타입을 비교해서 항상 최선의 방법을 찾도로 해야 한다.     /// </summary>     Thread,     /// <summary>     /// ThreadPool에서 Task를 실행 하도록 함.     /// </summary>     ThreadPool }

[코드3] Thread와 ThreadPool을 선택적으로 수행 하도록 수정한 코드


 위와 같이 소스를 수정하게 되었더니 동시 실행되는 Task가 Core 갯수의 제한 보다는 동시성 수준에 맞게 수행이 되었다. 하지만 일반적인 시나리오에서는 기본 스케줄러를 이용해 수행이 더 효율적이며 LimitedConcurrencyLevelTaskSchedler은 Cpu 경합 보다는 I/O 작업과 같은 대기 시간이 특히 오래 걸리는 작업에 대해 충분히 테스트를 거처 실 업무에 적용해 봐야 할 것이다. 이 코드는 모든 환경에서 우수한 성능으로 수행할 것이라고 보장하지 않는다. 이제 "코드4"을 통해 확인해 보자

/// <summary>
/// LimitedConcurrencyLevelTaskScheduler로 테스트
/// </summary>
public void LimitedConcurrencyLevelTaskScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    //var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5, LimitedConcurrencyLevelTaskExecuteType.Thread);
    var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5);
    limitedScheduler.ExecuteType = LimitedConcurrencyLevelTaskExecuteType.Thread;
 
    // 커스터마이징 된 LimitedConcurrencyLevelTaskScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(limitedScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    for (int j = 1j <= 20000j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 5i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds + "ms");
}

[코드4] LimitedConcurrencyLevelTaskScheduler 테스트 코드


 지금까지 TaskScheduler을 커스터마이징 하는것에 대해서 알아 보았다. 그렇지만 모두 이해를 하는데에는 많이 모자랄 것이라 예상한다. 그건 아마도 필자의 실력이 아직 미진하여 정답을 제대로 알려 줄 수 있는 역량이 높지 않기 때문이리라 생각 한다. 그래서 좀더 깊이 있고 많이 알고 싶으신 분은 MS에서 제공하는 예제를 좀더 공부해 보면 어떨까 합니다. http://code.msdn.microsoft.com/windowsdesktop/Samples-for-Parallel-b4b76364

이 페이지의 샘플 소스를 받아 분석하고 자신의 자산으로 만들 수 있는 기회를 가졌으면 합니다. 지금까지 읽어 주셔서 감사합니다. 꼭 당부를 드리자면 직접 실행해 보고 디버깅을 해 보는 것이 눈으로 확인하는 것 보다 더 많은 기회와 가치를 제공해 준다는 것을 알려 드립니다.


소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       


+ Recent posts