C# TaskFactory and TaskScheduler

http://msdn.microsoft.com/ko-kr/library/dd321418.aspx

http://code.msdn.microsoft.com/ParExtSamples

http://msdn.microsoft.com/ko-kr/library/dd997402.aspx

http://channel9.msdn.com/Events/TechDays/Techdays-2012-the-Netherlands/2287


 이전 포스트에서 "new TaskFactory(new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);"와 같은 형식으로 TaskScheduler를 배타적 실행이 되도록 하여 Task.Yield()를 테스트를 하였다. ( 2013/03/11 - [.Net Framework] - [C# Snippet]Task-based Asynchronous Pattern Task.Yield ) 이번에는 이 작업 스케줄러에 대해서 커스터마이징을 하는 방법에 대해서 알아 보고 Task 수행을 마음대로 요리할 수 있는 방법에 대해서 알아 보고자 한다.


 기본 TaskScheduler은 CPU Core의 갯수에 맞춰서 동시 실행이 되도록 되어 있습니다. 이 스케줄러는 일반 적인 케이스(I/O, CPU, Network, ...)에서 노말하게 동시 실행이 되도록 배분을 하고 있다. 그렇지만 Network 대기 시간같은 응답 대기에서는 경합에 의한 CPU 손실보다는  보다 많은 실행을 동시에 수행하는 것이 더 중요할 때가 있다.  보다 많은 동시 실행을 요구하는 시나리오에서 기본 스케줄러가 이닌 커스터마이징 된 스케줄러를 통해서 동시 실행 갯수를 통제 할 수 있을 것이다. ( ThreadPool.QueueUserWorkItem(new WaitCallback(_ => { })); 와 같은 비슷한 패턴으로 TaskFactory로 생성된 타스크가 수행된다. 하지만 경합 관리에서는 다른 패턴으로 관리가 되고 있다. http://www.codethinked.com/net-40-and-systemthreadingtasks)


 참고로 ThreadPool과 TaskPool의 스케줄링에 의한 실행 결과를 "그림1"을 통해 확인해 보자

[그림1] Thread and Task 실행 비교

http://www.codethinked.com/net-40-and-systemthreadingtasks

( Thread는 ThreadPool.QueueUserWorkItem을 통해 실행 됨 )


 이와 같이 스케줄링에 의해서 총 실행 결과에 영향을 미칠 수가 있다. 이제 본격적으로 TaskScheduler을 좀더 파헤처 보도록 하자. "코드1"에서는 기본적으로 제공하는 스케줄러를 가져오는 방법이다.

TaskScheduler scheduler = TaskScheduler.Default;

[코드1] 기본 제공되는 Scheduler


 이 스케줄러는 Task.Factory와 같은 형식으로 스케줄리이 되는 기본 스케줄러다. 이제는 MyTaskScheduler를 만들어 수행 순서를 마음대로 조절하는 Scheduler를 만들어 볼 것이다.

/// <summary>
/// TaskScheduler을 상속 받아 구현 한다.
/// </summary>
public class MyTaskScheduler : TaskScheduler
{
    protected override System.Collections.Generic.IEnumerable<Task> GetScheduledTasks()
    {
        throw new NotImplementedException();
    }
 
    protected override void QueueTask(Task task)
    {
        throw new NotImplementedException();
    }
 
    /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>
    protected override bool TryExecuteTaskInline(Task taskbool taskWasPreviouslyQueued)
    {
        throw new NotImplementedException();
    }
}

[코드2] TaskScheduler abstract class를 상속 코드


 "코드2"는 추상 TaskScheduler을 상속받은 모습이다. 이제 해당 메소드를 하나씩 채워 나가면 나만의 Scheduler이 만들어 지는 것이다. 생각보다 간단해 보이지 않는가? 이제 간단하게 완성된 MyTaskScheduler를 살펴 보자.

public class MyTaskScheduler : TaskScheduler
{
    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>
    /// <param name="task">큐에 대기할 Task입니다.</param>
    protected override void QueueTask(System.Threading.Tasks.Task task)
    {
        TryExecuteTask(task);
    }
 
    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>
    /// <param name="task">실행할 타스크</param>
    /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>
    /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns>
    /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>
    protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)
    {
        return TryExecuteTask(task);
    }
 
    /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>
    /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>
    protected override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()
    {
        return Enumerable.Empty<System.Threading.Tasks.Task>();
    }
 
    /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>
    public override int MaximumConcurrencyLevel { get { return 1; } }
}

[코드3] MyTaskScheduler


 MyTaskScheduler는 QueueTask로 들어온 Task를 TryExecuteTask를 통해서 바로 실행이 되도록 하였기에 TryExecutTaskInline로 호출이 되지 않도록 되어 있다. 만약 여기에서 실행이 되지 않으면 TryExecuteTaskInline로 다시 시도하게 된다. 지금 구현된 MyTaskScheduler은 기본적으로 동시 실행이 1개만 되도록 설계 및 구현 되었으며 다음에 좀더 확장하여 살펴 보도록 하겠다. 참고로 TaskSchedler.Default의 MaximumConcurrencyLevel은 2147483647이다. 이제 "코드4"를 통해서 실행하는 코드를 살펴 보자

/// <summary>
/// 커스터 마이징된 MyScheduler로 테스트
/// </summary>
public void MyScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    TaskScheduler myScheduler = new MyTaskScheduler();
    // 기본 스케줄러
    // myScheduler = TaskScheduler.Default;
    // 동시 실행 갯수 확인
    // Console.WriteLine(myScheduler.MaximumConcurrencyLevel);
 
    // 커스터마이징 된 MyScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(myScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    // 동시 실행이 되도록 수행
    for (int j = 1j <= 200j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 500i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds);
}

[코드4] MyScheduler 테스트 코드


 위 코드를 통해 결과 화면을 "그림1"에서 확인해 보자.


[그림1] MyTaskScheduler 방식으로 실행 결과 화면


 위 결과를 보면 여러 타스크가 실행이 되었지만 하나의 쓰레드 ID에서 실행이 된 것을 확인 할 수 있다. 지금까지 간단하가 MyScheduler은 TaskScheduler에서 제공하는 기본만을 가지고 기본 개념을 이해하기 위해 연습을 한 것이며 다음 포스트에서 좀더 디테일한 작업을 진행해 보도록 하겠다.



소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       


+ Recent posts