C# Task의 TaskCreationOptions별 실행 비교와
 ThreadPool의 관계
(SetMinThreads SetMaxThreads)

http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool.getmaxthreads.aspx



 이번 포스트에서는 "Thread를 컨트롤"의 라디오 버튼을 이용해서 SetMinThreads와 SetMaxThreads를 통해 제한된 상황을 만들어서 컨트롤 하는 시나리오를 검토해 보도록 하겠습니다. 지난 시간에 포스트 할때 배포 되었던 소스에서 약간더 수정한 소스를 다시 올려 드리며 추가된 화면 컨트롤에 대한 설명을 더 진행하도록 하겠다.


[그림1] 새로 추가된 테스트 화면


 "그림1"에서와 같이 추가된 컨트롤에 대한 설명을 "표1"에 설명 한다.


 컨트롤 명

 설명 

 Available Thread Count

 ThreadPool에서 사용 가능한 쓰레드 수 
 Available IOC Thread Count

 ThreadPool에서 I/O 관련 사용 가능한 쓰레드 수 

[표1] 추가된 컨트롤에 대한 설명


추가된 기능은 실시간으로 ThreadPool에서 사용가능한 쓰레드 수를 화면에 보여주도록 하여 Thread 컨트롤에서 어떻게 동작하는지 보다 정확히 알 수 있도록 하였다. 

 이제 "시나리오2"처럼 실행을 해보자.


시나리오2

 1.  "Thread를 컨트롤" 라디오 버튼을 클릭

 2. 기본 세팅되어 있는 Min Worker : 4, Max Worker : 4를 확인 한다.

 3. "Set" 버튼을 클릭

 4. "Async Processing" 버튼 클릭

 5. 동시 실행 갯수 확인

 6. MaxWorker을 6으로 수정

 7. "Clear" 버튼 클릭

 8. "Set" 버튼 클릭

 9. "Async Processing"버튼 클릭

10. 동시 실행 갯수 확인



이제 시나리오 2와 같이 실행하면 "Thread"컨트롤 라디오 버튼을 클릭하면 임의의 값으로 ThreadPool을 설정할 수 있으며 설정된 정보와 같이 Task 비 동기 실행이 되는 것을 확인 할 수 있다.


[그림1] MaxWorker을 4로 세팅하고 수행하는 화면



[그림2] MaxWorker을 6로 세팅하고 수행하는 화면


 "그림1"과 "그림2"에서 확인된 것과 같이 MaxWorker로 설정된 값을 기준으로 초기 동시 실행 갯수가 제한되는 것을 확인할 수 있다. 이제 대략적인 프로그램에 대한 설명을 마친다. 이제 전체적인 개념과 프로그램 화면에 대한 사용 방법 및 개념에 대해서 어느정도 이해를 할 수 있을 정도일 것으로 예상하고 이제 소스 코드에 대해 알아 보도록 하자. 다운받을 수 있게 소스 코드를 올려 놓을 것이니 포스트에서는 핵심 코드에 대해 부가적인 설명이 필요한 부분에 대해서만 진행 하겠다. 다시 한번 더 부탁드리자면 직접 디버깅을 통해 실행 하면서 체험해 보는것이 가장 좋고 소스에 같이 주석처리된 코멘트를 잘 읽어 간다면 보다 쉽게 체득할 수 있을거라 장담한다. 이제 코드를 살펴 보도록 하자.


다운받기

TaskTest_Next.zip

<RadioButton VerticalAlignment="Center" x:Name="rdoCustomerThreadControl" Content="Thread를 컨트롤" HorizontalAlignment="Center" Margin="5, 0, 0, 0"></RadioButton>
<StackPanel Orientation="Horizontal" Margin="20, 0, 0, 0" IsEnabled="{Binding ElementName=rdoCustomerThreadControl, Path=IsChecked, Mode=TwoWay}">

[코드1] 메인 화면의 Xaml중에서 Binding을 통해 값 연결


 "코드1"에서는 IsEnabled="{Binding ...}을 통해 radCustomerThreadControl의 값이 실시간으로 변홤에 따라서 IsEnabled의 값이 변경이 되도록 Xaml단에서 연결 시켜 놓는 작업이다. 이 작업은 UI단에서의 작업이므로 전체 로직에서 차지하는 비중을 차지 하지는 않는다. 다만 Xaml을 하는 장점을 한번 보여드리고자 하였다.


// 일정 간격마다 ThreadPool을 가져와서 화면에 보여준다.
Task.Factory.StartNew(() => {
    while (true)
    {
        //Min thread work
        int minWorkerminIOC;
        ThreadPool.GetMinThreads(out minWorkerout minIOC);
        // 가져온 값으로 화면에 보여준다.
        tbMinThreadCount.Dispatcher.BeginInvoke(new Action(() => { tbMinThreadCount.Text = minWorker.ToString(); }), null);
 
        //Max thread work
        int maxWorkermaxIOC;
        ThreadPool.GetMaxThreads(out maxWorkerout maxIOC);
        // 가져온 값으로 화면에 보여준다.
        tbMaxThreadCount.Dispatcher.BeginInvoke(new Action(() => { tbMaxThreadCount.Text = maxWorker.ToString(); }), null);
 
        // 특정 시간에 스레드 풀에 있는 실제 스레드 수를 확인
        int availableWorkThreadsavailableCompletionPortThreads;
        ThreadPool.GetAvailableThreads(out availableWorkThreadsout availableCompletionPortThreads);
 
        tbAvailableThreadCount.Dispatcher.BeginInvoke(new Action(() => {
            tbAvailableThreadCount.Text = availableWorkThreads.ToString();
            tbAvailableCompletionThreadCount.Text = availableCompletionPortThreads.ToString();
        }));
 
        Thread.Sleep(1000);
    }
});

[코드2] ThreadPool의 상태를 화면에 적용 하는 코드


 "코드2"는 ThreadPool의 상태를 가져와서 텍스트 박스에 할당하는 작업을 비 동기, 주기적으로 갱신하도록 하였다. 이 작업은 실행되는 풀의 내부 상태를 바로 알 수 있도록 하기 위한것이고 동시 실행갯수에 따라서 변경되는 것을 확인할 수 있다. 


// 스레드 작성 및 소멸을 관리하기 위한 알고리즘으로 전환하기 전에 새 요청에 따라 스레드 풀이 생성하는 스레드의 최소 수를 설정합니다.
// minWorker : 스레드 풀에서 필요할 때 만드는 작업자 I/O 스레드의 최소 개수입니다. 
// minIOC : 스레드 풀에서 필요할 때 만드는 비동기 I/O 스레드의 최소 개수입니다. File.BeginWrite(,,,) 와 같이 파일 관련 비동기 함수를 실해할 때 MinIOC를 쓰레드에서 실행한다. // http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool.getmaxthreads.aspx 확인 가능
ThreadPool.SetMinThreads(minWorkerminIOC);
 
 
// 동시에 활성 상태가 될 수 있는 스레드 풀에 대한 요청 수를 설정합니다. 해당 수를 넘는 모든 요청은 스레드 풀 스레드가 사용 가능해질 때까지 큐에 대기 상태로 남아 있습니다.
// maxWorker : 스레드 풀에 있는 최대 작업자 스레드 수입니다. 
// minIOC : 스레드 풀에서 필요할 때 만드는 비동기 I/O 스레드의 최소 개수입니다. File.BeginWrite(,,,) 와 같이 파일 관련 비동기 함수를 실해할 때 MinIOC를 쓰레드에서 실행한다. // http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool.getmaxthreads.aspx 확인 가능
ThreadPool.SetMaxThreads(maxWorkermaxIOC);

[코드3] ThreadPool에 Thread의 활동 제한을 설정 하는 코드


 "코드3"은 자동으로 변경되는 ThreadPool의 활성화 작업을 제한하는 코드로서 최저와 최고치를 설정하여 활성화 되는 쓰레드의 갯수를 제한한다. 이 작업은 "Thread를 컨트롤" 버튼을 눌러 활성화 시켰을 때에만 수행 하도록 되었다.


Task.Factory.StartNew(() =>
{
    for (int i = 1i <= counti++)
    {
        this.Dispatcher.BeginInvoke(new Action(() =>
        {
            var stackPanel = GetProgressBar();
            // 제공된 컨트롤을 리스트 객채에 넣는다.
            lstTaskResult.Items.Add(stackPanel);
        }));
    }
});

[코드4] 진행바를 ListItem 객체에 추가하는 코드


 GetProgressBar()에서 프로그래밍 적으로 프로그래스바를 화면에 보일 수 있도록 구성한 다음 진행바가 수행하는 작업을 연결 시켜 비 동기로 수행이 되도록 세팅된 StackPanel을 받아 ListItem 객체에 자식 컨트롤로 추가하는 코드다.


// Processing 버튼을 눌렀을 때 비 동기 Task가 활성화 되도록 처리 함.
Task.Factory.StartNew(() => {
 
    // Task를 활성화 시킴
    tasks.ForEach(task => task.Start());
 
    // 모두 완료가 될때까지 대기
    Task.WaitAll(tasks.ToArray());
 
    // 모든 Task가 수행을 마쳐 더 이상 관리가 필요 없어져서 관리에서 제거
    tasks.Clear();
});

[코드5] 진행바에 연결된 Task를 활성화 시키는 코드


 ForeEach의 Linq 구문을 통해 쉽게 수행이 되도록 하였으며 Task.WaitAll로 모두 완료가 될때가지 대기하도록 하였다. 완료가 되면 더 이상 사용하지 않는 Task이므로 Clear을 통해 모두 초기화 시켜주는 작업을 하였다. 


 이번 포스트는 소스코드를 포함하여 포스팅되었으며 전체 소스 설명 보다는 중요한 부분에 대해서 간략하게 설명하게 되었다. 자세한 설명은 소스를 받아 살펴 볼 수 있을 것이다.


 이 프로그램을 사용하여 Task의 비 동기 수행에 대한 특성을 좀더 쉽게 알 수 있었으면 하는 바램으로 이 글을 올리게 되었다.



Tip !

 아래와 같이 Framework에 따라 최대 허용 쓰레드 수가 다르다. ThreadPool.SetMaxThread를 통해 세팅할 수 있다.


 최대 쓰레드 수

 Platform 환경 

 1023

 32비트 .Net Framework 4

 32768  64비트 .Net Framework 4 
 250 per Core

 .Net Framework 3.5 

 25 per Core

 .Net Framework 2 




소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       


C# Task의 TaskCreationOptions별 실행 비교와
 ThreadPool의 관계
(SetMinThreads SetMaxThreads)



 이번에는 Task가 내부적으로 어떤 방식으로 동작하고 수행되는지에 대해서 알아가는 시간을 가져 보고자 한다. 지금까지 흔히 Task.Factory.Start()를 통해서 타스크를 생성하고 비 동기로 수행이 되도록 작업을 수행하여 왔지만 깊이 있게 다뤄보지는 안았었다. 그래서 비주얼 적으로 확인하기 위해 WPF 프로젝트를 만들어 확인 할 것이다. 아래 "그림1"은 WPF를 통해 Task 수행결과를 확인할 수 있는 화면을 먼저 보도록 하자.


[그림1] WPF를 통해서 Task 수행 결과를 확인하는 화면


[이번에는 프로젝트를 압축해서 다운 받을 수 있게 올려 두었으니 바로 다운받아 확인할 수 있을 것이다.

TaskTest.zip (클릭하면 다운로드 됨)]


 우선 소스에 대한 설명 보다는 프로그램 화면을 확인하면서 설명하면 소스의 전체적인 흐름을 알 수 있을 것이고 그것을 알면 코드를 보다 쉽게 이해할 수 있으리라는 생각이다. 그럼 지금부터 잘 따라워 주면 좋겠다. 아래 "표1"은 버튼에 대한 설명 및 선택 인자에 대한 간단한 화면 설명부터 시작하도록 하겠다.


 Set

 ProgressBar를 ListItem에 추가하고 Task를 생성하여 추가된 Progress에 연결하여 비 동기로 진행바를 변경 할수 있도록 세팅한다.

 Clear

 추가된 컨트롤을 ListItem에서 모두 지운다.

 다른 변수에 대해서도 초기화를 수행 한다.

 Async Processing

 Set에서 추가된 컨트롤에 연결된 Task를 활성화 시켜 비 동기 수행 작업으로 진행바를 증가 시키도록 한다. 

 Async Cancel

 수행되고 있는 Task를 취소 한다  

 Min Thread Count   취소 동시 실행 갯수 
 Max Thread Count 

 최대 동시 실행 갯수 

 Thread를 컨트롤

 Radio 버튼으로 되어 있어서 한번 선택이 되면 취소할 수 없도록 하였다.
 선택되면 TextBox를 사용할 수 있도록 활성화 된다. 

 MinWorker  Min Thread Count를 해당 숫자로 세팅한다. 
 MinIOC  Min Thread IOC Count를 해당 숫자로 세팅한다.   
 MaxWorker

 Max Thread Count를 해당 숫자로 세팅한다. 

 MaxIOC

 Max Thread IOC Count를 해당 숫자로 세팅한다. 

 Slide Control

 Set 버튼을 눌렀을 때 한번에 추가될 컨트롤 갯수 선택 할 수 있음.

 ComboBox

 선택된 값으로 Task생성 시 TaskCreationOptions으로 세팅한다.


 None
 기본 동작이 사용되도록 지정합니다.
 PreferFairness
 가능한 한 공정한 방식, 즉 일찍 예약된 작업은 일찍 실행되고 나중에 예약된 작업은 나중에 실행될 수 있는 방식으로 작업을 예약하는 TaskScheduler에 대한 힌트입니다.
 LongRunning
 작업이 장기 실행되는 정교하지 않은 작업이 되도록 지정합니다.초과 구독을 보장할 수 있는 TaskScheduler에 대한 힌트를 제공합니다.
 AttachedToParent
 작업이 작업 계층 구조의 부모에 연결되도록 지정합니다.
 DenyChildAttach
 지정 하는 InvalidOperationException 만들어진된 작업에는 자식 작업을 첨부 하려고 시도 하는 경우에 throw 됩니다.
 HideScheduler
 앰비언트 스케줄러를 현재 스케줄러에서 만들어진된 작업으로 표시 되지 않습니다.즉, StartNew 또는 Continuewith와 같은 생성된 작업을 수행 하는 작업을 볼 수 있도록 Default 현재 스케줄러로 합니다.


[표1] 화면 행위 단위 설명표


 이제 표를 확인했으니 버튼을 누르면 어떻게 동작하는지 대충 알거라 생각한다. 그렇다면 이제 다운 받은 압축 파일을 풀어서 실행해 보자. 아무 버튼이나 눌러봐도 컴퓨터에 이상이 있지는 안을것이기에 걱정하지 말고 마구마구 눌러 보자. 만약 "Async Processing"을 눌렀다면 진행바가 움직이면서 작업이 수행되는 것을 확인할 수 있을 것이다. 몇번의 클릭 만으로도 전체적인 흐름을 간략하게나마 체험적으로 알 수 있을 것이다. 그렇다면 필자의 의도대로 아래와 같은 시나리오로 한번 따라서 실행해 보기를 바란다. 만약 제대로 따라 했다면 뭔가 이상한 점을 발견할 수 있을 것이다. 


시나리오 :
 1. "None" 상태의 콤보박스에서 "Set"버튼을 누르면 10개의 진행바가 추가되며(10개 추가됨) 
 2. "Async Processing"을 누르면 진행바가 움직인다. 작업이 완료 후
 3. "Clear" 버튼을 누르면 초기화가 되고 다시 한번 더
 4. "Set"을 누르고
 5. "Async Processing" 버튼을 누른다.
 

[시나리오1] Min, Max Thread 활성화 변경 확인 시나리오


 위와 같은 시나리오와 같은 방법으로 확인해 보면 처음에는 몇개의 진행바만(테스트 환경의 CPU Core마다 다를 수 있다, 4 Core면 네개의 진행바가 변경되는 것을 확인할 수 있을 것이다. 그렇지만 실제로는 3개의 진행바만 움직이는데 그 이유는 하나의 Task가 내부적으로 계속 활성화 되어 있어서 발생하는 현상이다. (활성화된 Task는 Min Thread Count와 Max Thread Count의 값을 지속적으로 화면에 갱신하여 변화되는 값을 보여주는 작업 진행하고 있다) 3개의 진행바가 변경이 되고 있는 상태에서 완료되지 않은 다른 진행바가 순차적으로 작업이 시작되어 값이 변경 된다. 이건 내부적으로 Thread.Sleep 때문에 일어나는 현상이기도 하다. 그러면서 최종적으로는 10개의 진행바가 모두 변경(내부적으로 Task가 실행이 되고 있다.)이 되는 것을 확인할 수 있다. 이와 같이 10개의 Task가 활성화 되어 있는 상태는 ThreadPool이 내부에 관리되고 있는 동시 실행 가능한 갯수를 10개로 세팅된 상태로 변경이 된다.  이런 상태에서 "시나리오1"의 3, 4, 5를 차례로 수행하면 첫번째 실행된 행동 패턴과는 다르게 10개의 진행바가 한번에 진행되는 것을 확인 할 수 있다.


 결과적으로 비 동기 Task 수행도 내부적으로는 ThreadPool에서 관리를 받으며 수행이 된다는 것을 확인 할 수 있다. 조금더 자세한 사항은 Task.Factory를 커스터마이징 하는 포스트에서도 확인할 수 있을 것이다. 


[그림2] PreferFaimess Async Processing 한번 실행 후 Clear -> Set -> Async Processing 실행 결과 화면


 "그림2"에서와 같이 동시 실행 갯수가 10개로 같이 시작 하는것을 확인 할 수 있다. 그렇지만 이런 유형은 Thread.Sleep(1)의 구문으로 인해 발생하는 현상이다. Sleep 없이 비 동기를 실행 하면 절대적인 CPU Core수의 제한을 받게 된다. 그에 대한 확인으로 Sleep대신에 Task.SpinWait(500000)으로 수정하고 비 동기로 실행하면 위와 같은 현상과는 다르게 나타난다.


 지금까지 전체적인 흐름과 하나의 시나리오에 대해서 설명을 하게 되었고 그 이외의 다른 시나리오와 소스에 대한 설명은 다음 포스트에 이어서 하도록 하겠다.


( 무엇보다 이번 포스트는 올려 놓은 소스를 다운 받아서 직접 테스트와 디버깅을 통해 몸소 체험과 분석을 통해 체득하는 과정이 필수라고 알려주고 싶다. 눈으로만 보는것과 직접 체험해 보는건 하늘과 땅 차이만큼 많은 깨우침의 차이를 가져올거라 믿고 있다. )


소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       


C# TaskFactory and TaskScheduler

http://msdn.microsoft.com/ko-kr/library/dd321418.aspx

http://code.msdn.microsoft.com/ParExtSamples

http://msdn.microsoft.com/ko-kr/library/dd997402.aspx

http://channel9.msdn.com/Events/TechDays/Techdays-2012-the-Netherlands/2287


 이전 포스트에서 MyTaskScheduler을 직접 만들어 보았다. 그러나 동시성 수준이 1이라서 비 동기 효율이 제대로 나오지 않는 구조적인 문제가 있었다. 실제로 테스트를 해보면 Default Scheduler 보다 훨씬 안 좋은 성능을 보여주고 있다. 그리하여 이번에는 동시성 수준을 마음대로 컨트롤 할 수 있는 LimitedConcurrencyLevelTaskScheduler을 만들어 보도록 하겠다.

/// <summary> /// TaskScheduler을 상속 받아 구현 한다. /// Provides a task scheduler that ensures a maximum concurrency level while /// running on top of the ThreadPool. /// </summary> public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler {     /// <summary>     /// 현재 쓰레드가 작업 리스트를 처리하고 있는지 여부 판단     /// </summary>     /// <remarks>     /// ThreadStaticAttribute 로 표시된 static 필드는 스레드 간에 공유되지 않습니다.     /// 각 실행 스레드에는 필드에 대한 별도의 인스턴스가 있으며 해당 필드에 대한 값을 독립적으로 설정하고 가져옵니다.     /// 필드를 서로 다른 스레드에서 액세스하면 해당 필드에는 다른 값이 들어가게 됩니다.     /// </remarks>     [ThreadStatic]     private static bool _currentThreadIsProcessingItems;     /// <summary>실행될 타스크 리스트</summary>     private readonly LinkedList<System.Threading.Tasks.Task> _tasks = new LinkedList<System.Threading.Tasks.Task>(); // protected by lock(_tasks)     /// <summary>현재 스케줄러에서 최대로 허용된 동시성 제어 수준</summary>     private readonly int _maxDegreeOfParallelism;     /// <summary>실제로 수행되고 있는 동시성 수준 숫자</summary>     private int _delegatesQueuedOrRunning = 0// protected by lock(_tasks)     /// <summary>     /// 인스턴스 초기화 진행     /// </summary>     /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>     public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)     {         if (maxDegreeOfParallelism < 1throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");         _maxDegreeOfParallelism = maxDegreeOfParallelism;     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">큐에 대기할 Task입니다.</param>     protected sealed override void QueueTask(System.Threading.Tasks.Task task)     {         // 처리할 작업 목록에 추가.         // tasks가 처리중이거나 준비가 되지 않았을 때 대기 함         lock (_tasks)         {             _tasks.AddLast(task);             // MaximumConcurrencyLevel의 숫자보다 작을 때만 실행             // 동시성 수준을 제어하는 Scheduler 클래스 이므로 이곳에서 동시성 수준을 체크한다.             if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)             {                 ++_delegatesQueuedOrRunning;                 NotifyThreadPoolOfPendingWork();             }         }     }     /// <summary>     /// 작업이 스케줄러에 대한 실행 할 필요가있는 스레드를 알려줍니다.     /// </summary>     private void NotifyThreadPoolOfPendingWork()     {         // ThreadPool에서 실행이 되도록 한다.         // 실행 주기는 ThreadPool이 CPU 환경에 맞게 동시 실행을 컨트롤 한다.         // http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool_methods(v=vs.110).aspx         ThreadPool.UnsafeQueueUserWorkItem(_ =>         {

        // 현재 스레드가 작업 항목을 활성화 하도록 함
        // 스레드에 작업 활성화를 할 수 있도록 한다.
        // ThreadStatic으로 선언되었기에 쓰레드마다 별도의 값으로 접근 한다.
        _currentThreadIsProcessingItems = true;
        try
        {
            // 대기열에 사용 가능한 모든 항목을 처리합니다.
            while (true)
            {
                System.Threading.Tasks.Task item;
                lock (_tasks)
                {
                    // 처리 할 항목이 더 있을 경우,
                    // 처리가 완료되면 루프를 나간다.
                    if (_tasks.Count == 0)
                    {
                        --_delegatesQueuedOrRunning;
                        break;
                    }
 
                    // 큐에서 다음 항목을 가져 오기.
                    item = _tasks.First.Value;
                    _tasks.RemoveFirst();
                }
 
                // 큐에서 찾아낸 작업을 실행
                base.TryExecuteTask(item);
            }
        }
        // 현재 스레드에서 처리 항목을 완료
        finally { _currentThreadIsProcessingItems = false; }
                    }, null);     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">실행할 타스크</param>     /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>     /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns> /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>     protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)     {         //쓰레드에서 처리가 되고 있으면 별도 실해을 지정하지 않는다.         //중복 실행이 되지 않도록 해야 한다.         if (!_currentThreadIsProcessingItemsreturn false;         // 작업이 이전에 큐에 대기된 것이면 제거         if (taskWasPreviouslyQueuedTryDequeue(task);         // 한번더 실행을 시도 한다.         return base.TryExecuteTask(task);     }     /// <summary>이전에 이 스케줄러의 큐에 대기된 Task를 큐에서 제거하려고 합니다</summary>     /// <param name="task">큐에서 제거할 Task입니다.</param>     /// <returns>task 인수가 큐에서 제거되었는지 여부를 나타내는 부울입니다.</returns>     protected sealed override bool TryDequeue(System.Threading.Tasks.Task task)     {         lock (_tasksreturn _tasks.Remove(task);     }     /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>     public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }     /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>     /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>     protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()     {         bool lockTaken = false;         try         {             Monitor.TryEnter(_tasksref lockTaken);             if (lockTakenreturn _tasks.ToArray();             else throw new NotSupportedException();         }         finally         {             if (lockTakenMonitor.Exit(_tasks);         }     } }

[코드1] LimitedConcurrencyLevelTaskScheduler 전체 코드


 위와 같이 동시성 수준을 제어 할 수 있는 스케줄러를 만들어 보았다. 처음 인스턴스를 시킬때 넣는 동시성 수준 갯수를 세팅하면 한번에 실행되는 타스크의 갯수를 제어할 수가 있다. 전체적인 흐름은 QueueTask를 통해 넘어온 Task를 곧바로 실행하지 않고 Queue에 넣어 둔다. 동시성 수준을 통과한 상태에서 NotifyThreadPoolOfPendingWork에서 ThreadPool에서 각각의 Task를 실행이 활성화 되도록 한다. 그렇다면 이제 한번 실행하여 결과를 보도록 하자.


/// <summary>
/// LimitedConcurrencyLevelTaskScheduler로 테스트
/// </summary>
public void LimitedConcurrencyLevelTaskScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5);
 
    // 커스터마이징 된 LimitedConcurrencyLevelTaskScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(limitedScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    for (int j = 1j <= 20000j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 5i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds + "ms");
}

[코드2] LimitedConcurrencyLevelTaskScheduler 테스트 코드



[그림1] LimitedConcurrencyLeveTaskScheduler 실행 결과 화면


 "그림1"에서와 같이 여러 쓰레드 ID에서 각각의 Task가 수행이 된 것을 확인할 수 있다. 그렇지만 이 작업은 Default Scheduler 보다 작업 시간이 길게 걸린다. 뭔가가 문제가 있는 것일까? 예상 기대치 보다 좋지 않다란 생각을 하게 되었다. 그래서 Default Scheduler의 기본 MaximumConcurrencyLevel을 확인해보니 2147483647로 확인이 되었다. 내가 세팅한 값보다 엄청 많은 동시성 수준이다. 그리고 기본적으로 ThreadPool을 통해서 수행하다 보니 CPU Core의 절대적인 숫자에 제한을 받는다. ( ThreadPool Click new ) 그러므로 아무리 동시성 수준을 높여도 Core 갯수 이상은 동시 실행이 되지 않는다. 이 요건은 처음 포스트 당시 언급 했던 대기 시간이 많은 수행에 대해서 특별한 Scheduler를 만들려고 하는 계획과는 차이가 있게 되었다.  그래서 ThreadPool 대신이 Thread를 통해서 실행이 되도록 하였으며 프로퍼티를 통해 Thread와 ThreadPool을 선택하여 수행 할 수 있도록 수정 하게 되었다. "코드3"를 확인해 보자


/// <summary>
/// TaskScheduler을 상속 받아 구현 한다.
/// Provides a task scheduler that ensures a maximum concurrency level while
/// running on top of the ThreadPool.
/// </summary>
public class LimitedConcurrencyLevelTaskScheduler : TaskScheduler
{
    /// <summary>
    /// 현재 쓰레드가 작업 리스트를 처리하고 있는지 여부 판단
    /// </summary>
    /// <remarks>
    /// ThreadStaticAttribute 로 표시된 static 필드는 스레드 간에 공유되지 않습니다.
    /// 각 실행 스레드에는 필드에 대한 별도의 인스턴스가 있으며 해당 필드에 대한 값을 독립적으로 설정하고 가져옵니다.
    /// 필드를 서로 다른 스레드에서 액세스하면 해당 필드에는 다른 값이 들어가게 됩니다.
    /// </remarks>
    [ThreadStatic]
    private static bool _currentThreadIsProcessingItems;
    /// <summary>실행될 타스크 리스트</summary>
    private readonly LinkedList<System.Threading.Tasks.Task> _tasks = new LinkedList<System.Threading.Tasks.Task>(); // protected by lock(_tasks)
    /// <summary>현재 스케줄러에서 최대로 허용된 동시성 제어 수준</summary>
    private readonly int _maxDegreeOfParallelism;
    /// <summary>실제로 수행되고 있는 동시성 수준 숫자</summary>
    private int _delegatesQueuedOrRunning = 0// protected by lock(_tasks)
 
    /// <summary>
    /// Task 활성화 실행 타입을 설정 (ThreadPool이 기본 값)
    /// </summary>
    private LimitedConcurrencyLevelTaskExecuteType ExecuteType = LimitedConcurrencyLevelTaskExecuteType.ThreadPool;
 
    /// <summary>
    /// 인스턴스 초기화 진행
    /// </summary>
    /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>
    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelism)
    {
        if (maxDegreeOfParallelism < 1throw new ArgumentOutOfRangeException("maxDegreeOfParallelism");
        _maxDegreeOfParallelism = maxDegreeOfParallelism;
    }

    /// <summary>
    /// 인스턴스 초기화 진행
    /// </summary>
    /// <param name="maxDegreeOfParallelism">최대 동시성 수준 허용 갯수</param>
    /// <param name="ExecuteType">Task수행 활성화 타입</param>
    public LimitedConcurrencyLevelTaskScheduler(int maxDegreeOfParallelismLimitedConcurr

ncyLevelTaskExecuteType ExecuteType)         : this(maxDegreeOfParallelism)     {         this.ExecuteType = ExecuteType;     }

    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">큐에 대기할 Task입니다.</param>     protected sealed override void QueueTask(System.Threading.Tasks.Task task)     {         // 처리할 작업 목록에 추가.         // tasks가 처리중이거나 준비가 되지 않았을 때 대기 함         lock (_tasks)         {             _tasks.AddLast(task);             // MaximumConcurrencyLevel의 숫자보다 작을 때만 실행             // 동시성 수준을 제어하는 Scheduler 클래스 이므로 이곳에서 동시성 수준을 체크한다.             if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)             {                 ++_delegatesQueuedOrRunning;                 NotifyThreadPoolOfPendingWork();             }         }     }     /// <summary>     /// 작업이 스케줄러에 대한 실행 할 필요가있는 스레드를 알려줍니다.     /// </summary>     private void NotifyThreadPoolOfPendingWork()     {         switch (ExecuteType)         {             case LimitedConcurrencyLevelTaskExecuteType.Thread:                 #region Thread로 실행                 // 쓰레드에서 Task가 실행이 되도록 한다.                 Thread thread = new Thread(new ThreadStart(() =>                 {                     NotifyThreadPoolOfPendingWorking();                 }));                 thread.Start();                 #endregion                 break;             case LimitedConcurrencyLevelTaskExecuteType.ThreadPool:                 #region ThreadPool.UnsafeQueueUserWorkItem로 실행                 // ThreadPool에서 실행이 되도록 한다.                 // 실행 주기는 ThreadPool이 CPU 환경에 맞게 동시 실행을 컨트롤 한다.                 // http://msdn.microsoft.com/ko-kr/library/system.threading.threadpool_methods(v=vs.110).aspx                 ThreadPool.UnsafeQueueUserWorkItem(_ =>                 {                     NotifyThreadPoolOfPendingWorking();                 }, null);                 #endregion                 break;         }     }     /// <summary>     /// 작업 실행을 시작 하도록 합니다.     /// </summary>     /// <remarks>     /// 실행을 Thread로 할것이지 ThreadPool에서 실행할 것인지 테스트를 위해 만듬.     /// </remarks>     private void NotifyThreadPoolOfPendingWorking()     {         // 현재 스레드가 작업 항목을 활성화 하도록 함         // 스레드에 작업 활성화를 할 수 있도록 한다.         // ThreadStatic으로 선언되었기에 쓰레드마다 별도의 값으로 접근 한다.         _currentThreadIsProcessingItems = true;         try         {             // 대기열에 사용 가능한 모든 항목을 처리합니다.             while (true)             {                 System.Threading.Tasks.Task item;                 lock (_tasks)                 {                     // 처리 할 항목이 더 있을 경우,                     // 처리가 완료되면 루프를 나간다.                     if (_tasks.Count == 0)                     {                         --_delegatesQueuedOrRunning;                         break;                     }                     // 큐에서 다음 항목을 가져 오기.                     item = _tasks.First.Value;                     _tasks.RemoveFirst();                 }                 // 큐에서 찾아낸 작업을 실행                 base.TryExecuteTask(item);             }         }         // 현재 스레드에서 처리 항목을 완료         finally { _currentThreadIsProcessingItems = false; }     }     /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>     /// <param name="task">실행할 타스크</param>     /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>     /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns> /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>     protected sealed override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)     {         //쓰레드에서 처리가 되고 있으면 별도 실해을 지정하지 않는다.         //중복 실행이 되지 않도록 해야 한다.         if (!_currentThreadIsProcessingItemsreturn false;         // 작업이 이전에 큐에 대기된 것이면 제거         if (taskWasPreviouslyQueuedTryDequeue(task);         // 한번더 실행을 시도 한다.         return base.TryExecuteTask(task);     }     /// <summary>이전에 이 스케줄러의 큐에 대기된 Task를 큐에서 제거하려고 합니다</summary>     /// <param name="task">큐에서 제거할 Task입니다.</param>     /// <returns>task 인수가 큐에서 제거되었는지 여부를 나타내는 부울입니다.</returns>     protected sealed override bool TryDequeue(System.Threading.Tasks.Task task)     {         lock (_tasksreturn _tasks.Remove(task);     }     /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>     public sealed override int MaximumConcurrencyLevel { get { return _maxDegreeOfParallelism; } }     /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>     /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>     protected sealed override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()     {         bool lockTaken = false;         try         {             Monitor.TryEnter(_tasksref lockTaken);             if (lockTakenreturn _tasks.ToArray();             else throw new NotSupportedException();         }         finally         {             if (lockTakenMonitor.Exit(_tasks);         }     } } /// <summary> /// 절대적인 우위를 보장하는 방법이 아니므로 두개 타입을 비교해서 항상 최선의 방법을 찾도로 해야 한다. /// </summary> public enum LimitedConcurrencyLevelTaskExecuteType {     /// <summary>     /// Thread로 Task를 실행 하도록 함.     /// ( CPU 경합이 적고, 대기 시간이 긴 로직에서 보다 많은 동시 수행을 진행 하도록 함 )     /// 절대적인 우위를 보장하는 방법이 아니므로 두개 타입을 비교해서 항상 최선의 방법을 찾도로 해야 한다.     /// </summary>     Thread,     /// <summary>     /// ThreadPool에서 Task를 실행 하도록 함.     /// </summary>     ThreadPool }

[코드3] Thread와 ThreadPool을 선택적으로 수행 하도록 수정한 코드


 위와 같이 소스를 수정하게 되었더니 동시 실행되는 Task가 Core 갯수의 제한 보다는 동시성 수준에 맞게 수행이 되었다. 하지만 일반적인 시나리오에서는 기본 스케줄러를 이용해 수행이 더 효율적이며 LimitedConcurrencyLevelTaskSchedler은 Cpu 경합 보다는 I/O 작업과 같은 대기 시간이 특히 오래 걸리는 작업에 대해 충분히 테스트를 거처 실 업무에 적용해 봐야 할 것이다. 이 코드는 모든 환경에서 우수한 성능으로 수행할 것이라고 보장하지 않는다. 이제 "코드4"을 통해 확인해 보자

/// <summary>
/// LimitedConcurrencyLevelTaskScheduler로 테스트
/// </summary>
public void LimitedConcurrencyLevelTaskScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    //var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5, LimitedConcurrencyLevelTaskExecuteType.Thread);
    var limitedScheduler = new LimitedConcurrencyLevelTaskScheduler(5);
    limitedScheduler.ExecuteType = LimitedConcurrencyLevelTaskExecuteType.Thread;
 
    // 커스터마이징 된 LimitedConcurrencyLevelTaskScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(limitedScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    for (int j = 1j <= 20000j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 5i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds + "ms");
}

[코드4] LimitedConcurrencyLevelTaskScheduler 테스트 코드


 지금까지 TaskScheduler을 커스터마이징 하는것에 대해서 알아 보았다. 그렇지만 모두 이해를 하는데에는 많이 모자랄 것이라 예상한다. 그건 아마도 필자의 실력이 아직 미진하여 정답을 제대로 알려 줄 수 있는 역량이 높지 않기 때문이리라 생각 한다. 그래서 좀더 깊이 있고 많이 알고 싶으신 분은 MS에서 제공하는 예제를 좀더 공부해 보면 어떨까 합니다. http://code.msdn.microsoft.com/windowsdesktop/Samples-for-Parallel-b4b76364

이 페이지의 샘플 소스를 받아 분석하고 자신의 자산으로 만들 수 있는 기회를 가졌으면 합니다. 지금까지 읽어 주셔서 감사합니다. 꼭 당부를 드리자면 직접 실행해 보고 디버깅을 해 보는 것이 눈으로 확인하는 것 보다 더 많은 기회와 가치를 제공해 준다는 것을 알려 드립니다.


소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       



C# TaskFactory and TaskScheduler

http://msdn.microsoft.com/ko-kr/library/dd321418.aspx

http://code.msdn.microsoft.com/ParExtSamples

http://msdn.microsoft.com/ko-kr/library/dd997402.aspx

http://channel9.msdn.com/Events/TechDays/Techdays-2012-the-Netherlands/2287


 이전 포스트에서 "new TaskFactory(new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler);"와 같은 형식으로 TaskScheduler를 배타적 실행이 되도록 하여 Task.Yield()를 테스트를 하였다. ( 2013/03/11 - [.Net Framework] - [C# Snippet]Task-based Asynchronous Pattern Task.Yield ) 이번에는 이 작업 스케줄러에 대해서 커스터마이징을 하는 방법에 대해서 알아 보고 Task 수행을 마음대로 요리할 수 있는 방법에 대해서 알아 보고자 한다.


 기본 TaskScheduler은 CPU Core의 갯수에 맞춰서 동시 실행이 되도록 되어 있습니다. 이 스케줄러는 일반 적인 케이스(I/O, CPU, Network, ...)에서 노말하게 동시 실행이 되도록 배분을 하고 있다. 그렇지만 Network 대기 시간같은 응답 대기에서는 경합에 의한 CPU 손실보다는  보다 많은 실행을 동시에 수행하는 것이 더 중요할 때가 있다.  보다 많은 동시 실행을 요구하는 시나리오에서 기본 스케줄러가 이닌 커스터마이징 된 스케줄러를 통해서 동시 실행 갯수를 통제 할 수 있을 것이다. ( ThreadPool.QueueUserWorkItem(new WaitCallback(_ => { })); 와 같은 비슷한 패턴으로 TaskFactory로 생성된 타스크가 수행된다. 하지만 경합 관리에서는 다른 패턴으로 관리가 되고 있다. http://www.codethinked.com/net-40-and-systemthreadingtasks)


 참고로 ThreadPool과 TaskPool의 스케줄링에 의한 실행 결과를 "그림1"을 통해 확인해 보자

[그림1] Thread and Task 실행 비교

http://www.codethinked.com/net-40-and-systemthreadingtasks

( Thread는 ThreadPool.QueueUserWorkItem을 통해 실행 됨 )


 이와 같이 스케줄링에 의해서 총 실행 결과에 영향을 미칠 수가 있다. 이제 본격적으로 TaskScheduler을 좀더 파헤처 보도록 하자. "코드1"에서는 기본적으로 제공하는 스케줄러를 가져오는 방법이다.

TaskScheduler scheduler = TaskScheduler.Default;

[코드1] 기본 제공되는 Scheduler


 이 스케줄러는 Task.Factory와 같은 형식으로 스케줄리이 되는 기본 스케줄러다. 이제는 MyTaskScheduler를 만들어 수행 순서를 마음대로 조절하는 Scheduler를 만들어 볼 것이다.

/// <summary>
/// TaskScheduler을 상속 받아 구현 한다.
/// </summary>
public class MyTaskScheduler : TaskScheduler
{
    protected override System.Collections.Generic.IEnumerable<Task> GetScheduledTasks()
    {
        throw new NotImplementedException();
    }
 
    protected override void QueueTask(Task task)
    {
        throw new NotImplementedException();
    }
 
    /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>
    protected override bool TryExecuteTaskInline(Task taskbool taskWasPreviouslyQueued)
    {
        throw new NotImplementedException();
    }
}

[코드2] TaskScheduler abstract class를 상속 코드


 "코드2"는 추상 TaskScheduler을 상속받은 모습이다. 이제 해당 메소드를 하나씩 채워 나가면 나만의 Scheduler이 만들어 지는 것이다. 생각보다 간단해 보이지 않는가? 이제 간단하게 완성된 MyTaskScheduler를 살펴 보자.

public class MyTaskScheduler : TaskScheduler
{
    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>
    /// <param name="task">큐에 대기할 Task입니다.</param>
    protected override void QueueTask(System.Threading.Tasks.Task task)
    {
        TryExecuteTask(task);
    }
 
    /// <summary>연결된 쓰레드에서 동기 Task 제공해 준다</summary>
    /// <param name="task">실행할 타스크</param>
    /// <param name="taskWasPreviouslyQueued">작업이 이전에 큐에 대기되었는지 여부를 나타내는 부울입니다.이 매개 변수가 True이면 작업이 이전에 큐에 대기된 것일 수 있습니다. False이면 작업이 큐에 대기되지 않은 것입니다. 작업을 큐에 대기하지 않고 인라인으로 실행하려면 이 호출을 수행합니다.</param>
    /// <returns>작업이 인라인으로 실행되었는지 여부를 나타내는 부울 값입니다. 성공적인 실행 시 True, 그 이외에 false</returns>
    /// <remarks>재진입으로 인한 오류를 방지하기 위해 작업 인라이닝은 관련된 스레드의 로컬 큐에서 대기 중인 대상이 있는 경우에만 발생합니다.</remarks>
    protected override bool TryExecuteTaskInline(System.Threading.Tasks.Task taskbool taskWasPreviouslyQueued)
    {
        return TryExecuteTask(task);
    }
 
    /// <summary>디버거를 지원하기 위해 현재 스케줄러의 큐에 대기되어 실행을 기다리고 있는 Task 인스턴스의 열거 가능한 형식을 생성합니다.</summary>
    /// <returns>디버거가 현재 이 스케줄러의 큐에 대기된 작업을 트래버스할 수 있도록 허용하는 열거 가능한 형식입니다.</returns>
    protected override IEnumerable<System.Threading.Tasks.Task> GetScheduledTasks()
    {
        return Enumerable.Empty<System.Threading.Tasks.Task>();
    }
 
    /// <summary>이 TaskScheduler가 지원할 수 있는 최대 동시성 수준을 나타냅니다.</summary>
    public override int MaximumConcurrencyLevel { get { return 1; } }
}

[코드3] MyTaskScheduler


 MyTaskScheduler는 QueueTask로 들어온 Task를 TryExecuteTask를 통해서 바로 실행이 되도록 하였기에 TryExecutTaskInline로 호출이 되지 않도록 되어 있다. 만약 여기에서 실행이 되지 않으면 TryExecuteTaskInline로 다시 시도하게 된다. 지금 구현된 MyTaskScheduler은 기본적으로 동시 실행이 1개만 되도록 설계 및 구현 되었으며 다음에 좀더 확장하여 살펴 보도록 하겠다. 참고로 TaskSchedler.Default의 MaximumConcurrencyLevel은 2147483647이다. 이제 "코드4"를 통해서 실행하는 코드를 살펴 보자

/// <summary>
/// 커스터 마이징된 MyScheduler로 테스트
/// </summary>
public void MyScheduler_TestMethod()
{
    // 시간을 재기 위해서 사용
    Stopwatch sw = new Stopwatch();
    sw.Start();
 
    TaskScheduler myScheduler = new MyTaskScheduler();
    // 기본 스케줄러
    // myScheduler = TaskScheduler.Default;
    // 동시 실행 갯수 확인
    // Console.WriteLine(myScheduler.MaximumConcurrencyLevel);
 
    // 커스터마이징 된 MyScheduler을 이용해 TaskFactory를 생성 하도록 한다.
    var factory = new TaskFactory(myScheduler);
    var tasks = new List<System.Threading.Tasks.Task>();
 
    // 동시 실행이 되도록 수행
    for (int j = 1j <= 200j++)
    {
        var task = factory.StartNew(() =>
        {
            for (int i = 0i < 500i++)
            {
                var a = Thread.CurrentThread.ManagedThreadId;
                Console.WriteLine("{0} on thread {1}"iThread.CurrentThread.ManagedThreadId);
            }
        });
 
        tasks.Add(task);
    }
 
    // 모두 완료가 될 때까지 대기
    System.Threading.Tasks.Task.WaitAll(tasks.ToArray());
 
    sw.Stop();
    Console.WriteLine(sw.ElapsedMilliseconds);
}

[코드4] MyScheduler 테스트 코드


 위 코드를 통해 결과 화면을 "그림1"에서 확인해 보자.


[그림1] MyTaskScheduler 방식으로 실행 결과 화면


 위 결과를 보면 여러 타스크가 실행이 되었지만 하나의 쓰레드 ID에서 실행이 된 것을 확인 할 수 있다. 지금까지 간단하가 MyScheduler은 TaskScheduler에서 제공하는 기본만을 가지고 기본 개념을 이해하기 위해 연습을 한 것이며 다음 포스트에서 좀더 디테일한 작업을 진행해 보도록 하겠다.



소스 코드 자체에 주석과 직관적인 코딩으로 충분히 파악이 가능할 것으로 예상하므로 별도의 설명을 생략하도록 하겠습니다. 포스트의 내용이 장황한 설명 보다는 주석과 소스코드 자체 만으로도 이해할 수 있도록 하기 위해 노력하였습니다.. 실제 개발에서도 필요한 소스는 단순히 Copy & Paste 만으로도 사용할 수 있습니다. 그리고 주석을 이용해 nDoc이나 별도의 자동 Document 제작 유틸로 API 문서를 만드는 데에도 도움이 되었으면 한다. 
※ DOC에 대한 프로그램 정보 Util link

ing™       


+ Recent posts