.net core Task的学习理解(上) 电脑版发表于:2021/8/18 12:06 ![.netcore](https://img.tnblog.net/arcimg/hb/c857299a86d84ee7b26d181a31e58234.jpg ".netcore") >#.net core Task的学习理解(上) [TOC] Thread 线程的生命周期 ------------ ![](https://img.tnblog.net/arcimg/hb/19fe6096041643069902db120116d43b.png) tn2>计算机系统在某一个时刻,当只有一个CPU工作时,它只执行一个进程中的一个线程,但是用户在使用计算机时回打开多个进行,这样就涉及到了线程的不同状态,一个进程可以创建多个线程。 ![](https://img.tnblog.net/arcimg/hb/5926f2d27a034b0fbcf10992754c5f89.png) tn2>当一个cpu从一个线程调用另一个线程的时候,需要将Thread1线程的状态保存到PCB1中,然后加载PCB2中Thread2的状态并调用Thread2的线程,当时间片用尽的时候再保存状态到PCB2中。... tn>当线程有很多的时候,CPU会不停的切换上下文,此时就会非常消耗CPU性能。所以线程池就出现了。 ThreadPool 线程池 ------------ tn2>线程池里面通过设置好线程的数量,可以将空闲的线程进行回收利用。可以通过`preferLocal:false`把任务放到线程池的(global queue)全局队列中,利用空闲的线程执行该任务;也可以通过`preferLocal:true`将放入线程中本地队列进行执行。 ![](https://img.tnblog.net/arcimg/hb/8944d003846047559446e1b7dab3c00f.png) tn2>线程池内部有两种队列,global queue和每个worker thread绑定的local queue。 Work item 的存储类型是Object,实际被 worker thread 执行的时候会判断其类型执行对应的入口方法。 work item 类型可能是Task也可能是IThreadPoolWorkItem。 ![](https://img.tnblog.net/arcimg/hb/f182bf48951c4665b7c2e11d5a8c34bb.png) tn2>当全局任务队列里面的任务也没有的时候,算法将会把其他有任务的线程中的任务,分配给这些空闲的线程去执行。其目的是每个线程都有事干。 ThreadPoolTaskScheduler ------------ tn2>我们先举个例子。Task.Run是我们常用的调用方式,这里我们通过委托的方式去获取它默认的调度器与委托,可以看到它默认的调度器是ThreadPoolTaskScheduler。 ```csharp static void Main(string[] args) { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); Task.Run(() => { Console.WriteLine(Thread.CurrentThread.ManagedThreadId); Console.WriteLine(TaskScheduler.Current.GetType()); }); Console.ReadLine(); } ``` ![](https://img.tnblog.net/arcimg/hb/bdf04a571c3c47e9946061a59e50593b.png) tn2>ThreadPoolTaskScheduler是在ThreadPool之上的一层,它确定一般模式下会将该任务(Task)对象分配给线程池(ThreadPool)的全局队列中,然后将其分配到哪个线程(Thread)的本地队列中进行执行。如果设置为LongRunning模式将不会把任务分配到线程池中,执行由下图所示。 ![](https://img.tnblog.net/arcimg/hb/f22482ca373a43e593abfab50efba7fc.png) tn>Task并不是线程,而是一个可以封装执行过程的一个对象实体。比如上一个案例中我们就是封装了一个委托。 线程与协程 ------------ >### 并行计算的协程 ![](https://img.tnblog.net/arcimg/hb/c0e3a859a5584aaa8c18fbddca7e1d1f.png) tn2>可以多个任务通过不同的线程执行进行计算。 比如下列例子:通过task1与task2不同的线程计算最后求积。 ```csharp static async Task Main(string[] args) { Task<int> task1 = Task.Run<int>(()=> { Console.WriteLine($"task1 finish [Thread]:{Thread.CurrentThread.ManagedThreadId}"); return 1; }); Task<int> task2 = Task.Run<int>(()=> { Console.WriteLine($"task2 finish [Thread]:{Thread.CurrentThread.ManagedThreadId}"); return 2; }); Text((await task1) * (await task2)); Console.ReadLine(); } static void Text(int i) { Console.WriteLine(i); } ``` ![](https://img.tnblog.net/arcimg/hb/d2f9fc9616c8475c95c51b4519f7a681.png) ```bash # 这样也可以 await Task.WhenAll(task1, task2); ``` tn2>在实际运用场景中我们可以计算工资啊,账单结算啊.... tn>如果不支持Main的异步,需要在配置文件中添加这几行。 ```bash <PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'"> <LangVersion>7.1</LangVersion> </PropertyGroup> ``` >### 协程与异步IO tn2>Task1发起请求然后自己的工作完成了,接着由IO等待线程接力,获取完所有IO后,由Task2进行处理。 ![](https://img.tnblog.net/arcimg/hb/eeeabd75919f43d487f782a059d9b466.png) tn2>我们通过请求百度之前,发现是由线程1处理该过程,IO获取完毕后是由线程8进行处理的。 ![](https://img.tnblog.net/arcimg/hb/d58668af936a4b28be2b13311cb7d379.png) ```bash static async Task Main(string[] args) { await downloadtask(); Console.ReadLine(); } static async Task downloadtask() { using (var client = new HttpClient()) { Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"Before IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); var result = await client.GetStringAsync("https://www.baidu.com/"); Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); Console.WriteLine($"After IO [Thread]:{Thread.CurrentThread.ManagedThreadId}"); } } ``` tn>`await`与线程的切换没有半点关系。 >### 本地队列与全局队列 tn2>我们通过编写如下代码,将任务放到本地队列中,并且执行5次,看看执行结果。 ```csharp for (int i = 0; i < 5; i++) { int b = i; Task.Run(() => Console.WriteLine($"Task{b} ThreadId:{Thread.CurrentThread.ManagedThreadId}")) .ContinueWith(_ => Console.WriteLine($"continuationAction{b} ThreadId:{Thread.CurrentThread.ManagedThreadId}")); Task.Delay(2000); } ``` ![](https://img.tnblog.net/arcimg/hb/233cd26217d64ae4a8586d116e14a18c.png) tn2>我发现Task.Run与ContinueWith的线程Id是同一个线程,在运行Task4的时候由于线程7空闲了,直接将Task4的任务执行了。 接着我们修改部分代码看看结果。 ```csharp Task.Run(() => Console.WriteLine($"Task1 ThreadId:{Thread.CurrentThread.ManagedThreadId}")) .ContinueWith(_ => Console.WriteLine($"continuationAction1 ThreadId:{Thread.CurrentThread.ManagedThreadId}")); Task.Run(() => Console.WriteLine($"Task2 ThreadId:{Thread.CurrentThread.ManagedThreadId}")) .ContinueWith(_ => Console.WriteLine($"continuationAction2 ThreadId:{Thread.CurrentThread.ManagedThreadId}"), TaskContinuationOptions.RunContinuationsAsynchronously); Console.ReadLine(); ``` ![](https://img.tnblog.net/arcimg/hb/c61a42525b514ecaa7564b01f121f670.png) tn2>很奇怪为什么不一样?这是因为Task1刚执行完,看到Task2还在执行,线程4就没事做了就去task2后面的continuationAction2任务做了。线程6发现自己没事做然后就去task1里面的continuationAction1拿过来做了。 >### 自定义TaskScheduler tn2>并不是所有的Task运行时都会到线程池(ThreadPool)里面去,这取决于它的TaskScheduler是哪一个;要想指定哪一个TaskScheduler需要通过TaskFactory来进行指定。下面我们通过编写自定义的TaskSchedule来进行测试。 ```csharp class CustomTaskScheduler : TaskScheduler { /// <summary> /// 定义一个线程安全的Task集合 /// </summary> private BlockingCollection<Task> _queue = new BlockingCollection<Task>(); public CustomTaskScheduler() => new Thread(() => { while (true) { // 从队列中取出一个 var task = _queue.Take(); // 进行执行 TryExecuteTask(task); Console.WriteLine($"task {task.Id} Executed"); } }) { // 定义为后台 IsBackground = true }.Start(); /// <summary> /// 获取所有Task /// </summary> /// <returns></returns> protected override IEnumerable<Task> GetScheduledTasks() => _queue.ToArray(); /// <summary> /// 添加Task /// </summary> /// <param name="task"></param> protected override void QueueTask(Task task) => _queue.Add(task); /// <summary> /// 确定提供的Task是否可以同步执行 /// </summary> /// <param name="task"></param> /// <param name="taskWasPreviouslyQueued"></param> /// <returns></returns> protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) => false; } ``` ```csharp static void Main(string[] args) { var taskFactory = new TaskFactory(new CustomTaskScheduler()); taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $"threadId:{Thread.CurrentThread.ManagedThreadId}")); taskFactory.StartNew(() => Console.WriteLine($"task {Task.CurrentId}" + $"threadId:{Thread.CurrentThread.ManagedThreadId}")); Console.ReadLine(); } ``` ![](https://img.tnblog.net/arcimg/hb/d3e93218bfd1433b870ca6f38ff0e869.png) 有限元状态机(finite-state machine) ------------ tn2>有限元状态机主要解决下列几个问题: 1.原来的线程去了哪里? 2.await表达式的返回值是怎么回事? 3.await之前的变量是如何被await之后的代码获取的? 4.await之后的任务在哪执行? 5.为什么有时候线程没有发生切换? ![](https://img.tnblog.net/arcimg/hb/fc306b2b6d9e42bb84d090dd7f956959.png) tn2>FooAsync里面的`await Task.Delay(100);`会编译成如下代码。而`FooStateMachine`就是状态机。(3)关于它的变量被编译成字段。并且初始化状态机状态为`-1`,接着开始执行状态机里面的内容。而MoveNext方法里面的内容就是执行的主要部分,(4)其中需要执行的代码也在该方法里面。 ![](https://img.tnblog.net/arcimg/hb/dc09c3b0fe144f34ae8efbe90f4830bf.png) tn2>接着我们执行MoveNext里面的内容。一开始状态是`-1`所以我们执行`_state!=0`这个判断下面的内容,然后通过需要执行的任务(Task)中获取`GetAwaiter`去进行判断是否执行完成。 如果执行完成将直接到`GetResult`方法获取结果然后返回该结果,执行结束。 如果taskAwaiter没有执行完成,状态将会改为`0`,并且通过异步的方式等待完成,封装并且将再次回调MoveNext方法(想在什么地方回调就在什么地方回调)。是`AwaitUnsafeOnCompleted`的主要工作。 当第二次进行执行的时候会执行else下面的内容,(2)通过taskAwaiter去获取结果,不管返回值需不需要结果都会去调用`GetResult`方法。 最后将通过`SetResult`放上结果值进行返回。 ![](https://img.tnblog.net/arcimg/hb/0588cd26af894701b62f440ca027d19e.png) tn2>整个执行流程图 ![](https://img.tnblog.net/arcimg/hb/3e6b86d7c6d1499281f1dd22213ded82.png) Task内部结构 ------------ ![](https://img.tnblog.net/arcimg/hb/a2e6c7b620254ef188dc3bbf65feafef.png) tn2>其核心的接口有两个`INotifyCompletion`与`ICriticalNotifyCompletion`。其中OnCompleted方法用来传递执行上下文,当实现`ICriticalNotifyCompletion`接口时就会去调用UnsafeOnCompleted这个方法。 自定义Awaiter ------------ tn2>可以按照该结构去进行自定义一个Awaiter ![](https://img.tnblog.net/arcimg/hb/5d1249cd7cc24c9990c8ffbb4f819a74.png) ```csharp public class CustomAwaiter<T> : ICriticalNotifyCompletion { private Action _continuation; private T _result; private Exception _exception; public bool IsCompleted { get; private set; } public void OnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}"); _continuation = continuation; } public void UnsafeOnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method}"); _continuation = continuation; } public T GetResult() { Console.WriteLine("CustomAwaiter.GetResult"); if (_exception != null) throw _exception; return _result; } public void SetResult(T result) { Console.WriteLine("CustomAwaiter.SetResult"); IsCompleted = true; _result = result; _continuation.Invoke(); } public void SetException(Exception exception) { Console.WriteLine("CustomAwaiter.SetException"); IsCompleted = true; _exception = exception; _continuation.Invoke(); } } ``` ```csharp public class CustomAwaitable<T> { private CustomAwaiter<T> _awaiter = new CustomAwaiter<T>(); public CustomAwaiter<T> GetAwaiter() { Console.WriteLine("CustomAwaitable.GetAwaiter"); return _awaiter; } } ``` tn2>测试执行代码 ```csharp static async Task Main(string[] args) { var foo = await Foo(); Console.WriteLine(foo); } static CustomAwaitable<string> Foo() { Console.WriteLine("Foo"); var awaitable = new CustomAwaitable<string>(); var awaiter = awaitable.GetAwaiter(); new Thread(() => { Thread.Sleep(100); awaiter.SetResult("Hello World!"); }) { IsBackground = false }.Start(); return awaitable; } ``` tn2>如果运气好,执行如下 ![](https://img.tnblog.net/arcimg/hb/4317c602579e4e84a6d6daa4b5307549.png) tn2>运气不好,就会报错。报错原因是因为`SetResult`在`UnsafeOnCompleted`回调未完成的之前执行了,`_continuation`字段的委托并没有MoveNext方法。(大佬建议不使用) ![](https://img.tnblog.net/arcimg/hb/2a1ba010a8a847c28522d5b8715bfecc.png) tn2>我做了一些修改可能会导致性能不太佳,但勉勉强强能避开这个问题。 ```csharp public class CustomAwaiter<T> : ICriticalNotifyCompletion { private Action _continuation; private T _result; private Exception _exception; AutoResetEvent autoEvent = new AutoResetEvent(false); public bool IsCompleted { get; private set; } public void OnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}"); _continuation = continuation; } public void UnsafeOnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method} {Thread.CurrentThread.ManagedThreadId}"); _continuation = continuation; autoEvent.Set(); } public T GetResult() { Console.WriteLine("CustomAwaiter.GetResult"); if (_exception != null) throw _exception; return _result; } public void SetResult(T result) { Console.WriteLine($"CustomAwaiter.SetResult {Thread.CurrentThread.ManagedThreadId}"); IsCompleted = true; _result = result; if (_continuation == null) { autoEvent.WaitOne(); } _continuation.Invoke(); } public void SetException(Exception exception) { Console.WriteLine("CustomAwaiter.SetException"); IsCompleted = true; _exception = exception; _continuation.Invoke(); } } ``` tn>当我们运行的程序可能会报错时,Task的异常信息里不会有GetResult的StackFrame,这里我们自己的程序是会报错的。 因为在Task的GetResult的方法里面会添加`StackTraceHidden`特性。 ![](https://img.tnblog.net/arcimg/hb/bb23181f42b448f3818f1658ac190bbe.png) ```csharp static async Task Main(string[] args) { var foo = await Foo(); Console.WriteLine(foo); } static CustomAwaitable<string> Foo() { Console.WriteLine("Foo"); var awaitable = new CustomAwaitable<string>(); var awaiter = awaitable.GetAwaiter(); new Thread(() => { try { Thread.Sleep(100); Console.WriteLine($"child Thread {Thread.CurrentThread.ManagedThreadId}"); throw new Exception("Hello Bug!"); } catch (Exception ex) { awaiter.SetException(ex); } }) { IsBackground = false }.Start(); return awaitable; } ``` ![](https://img.tnblog.net/arcimg/hb/0701b2738c3f4702b61ef0f79e34bd51.png) async的支持 ------------ tn2>虽然实现了await的方法但是并没有实现async的支持,所以这里async并不会认识我们定义的`CustomAwaiter`,相比async Task方法的编译结果,我们少了个`AsyncTaskMethodBuilder`。 ![](https://img.tnblog.net/arcimg/hb/448cb91ac67f4453b3908af4682628a2.png) ![](https://img.tnblog.net/arcimg/hb/40e69b1782ed40538ca82952bb8cc5f7.png) AsyncTaskMethodBuilder ------------ tn2>关于AsyncMethodBuilder的结构如下图所示。 ![](https://img.tnblog.net/arcimg/hb/2f72f6ab85724ef28869e7bc489029b8.png) tn2>如果要我们去实现async的支持的话,需要我们去定义自己的`AsyncMethodBuilder`。 ```csharp public struct CustomAsyncMethodBuilder<T> { private CustomAwaiter<T> _awaiter; private CustomAwaitable<T> _awaitable; /// <summary> /// 创建CustomAsyncMethodBuilder /// </summary> /// <returns></returns> public static CustomAsyncMethodBuilder<T> Create() { Console.WriteLine("CustomAsyncMethodBuilder.Create"); var awaitable = new CustomAwaitable<T>(); var builder = new CustomAsyncMethodBuilder<T> { _awaitable = awaitable, _awaiter = awaitable.GetAwaiter() }; return builder; } /// <summary> /// CustomAsyncMethodBuilder 的 任务 /// </summary> public CustomAwaitable<T> Task { get { Console.WriteLine("CustomAsyncMethodBuilder.Task"); return _awaitable; } } public void SetException(Exception exception) { Console.WriteLine("CustomAsyncMethodBuilder.SetException"); _awaiter.SetException(exception); } public void SetResult(T result) { Console.WriteLine("CustomAsyncMethodBuilder.SetResult"); _awaiter.SetResult(result); } /// <summary> /// 调用OnCompleted方法 /// </summary> /// <typeparam name="TAwaiter"></typeparam> /// <typeparam name="TStateMachine"></typeparam> /// <param name="awaiter"></param> /// <param name="stateMachine"></param> public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter,ref TStateMachine stateMachine) where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine { Console.WriteLine("CustomAsyncMethodBuilder.AwaitOnCompleted"); _awaiter.OnCompleted(stateMachine.MoveNext); } /// <summary> /// 调用AwaitUnsafeOnCompleted方法 /// </summary> /// <typeparam name="TAwaiter"></typeparam> /// <typeparam name="TStateMachine"></typeparam> /// <param name="awaiter"></param> /// <param name="stateMachine"></param> /// 将类型或成员标识为安全关键型,并可由透明代码安全访问 [SecuritySafeCritical] public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine) where TAwaiter : INotifyCompletion where TStateMachine : IAsyncStateMachine { Console.WriteLine("CustomAsyncMethodBuilder.AwaitUnsafeOnCompleted"); // var iscompleted = bool.Parse(awaiter.GetType().GetProperty("IsCompleted").GetValue(awaiter).ToString()); _awaiter.UnsafeOnCompleted(stateMachine.MoveNext); } public void Start<TStateMachine>(ref TStateMachine stateMachine) where TStateMachine:IAsyncStateMachine { Console.WriteLine("CustomAsyncMethodBuilder.Start"); stateMachine.MoveNext(); } public void SetStateMachine(IAsyncStateMachine stateMachine) { Console.WriteLine("CustomAsyncMethodBuilder.SetStateMachine"); } } ``` tn2>最后还需要在`CustomAwaitable`上添加自定义的`CustomAsyncMethodBuilder`特性 ```csharp [AsyncMethodBuilder(typeof(CustomAsyncMethodBuilder<>))] public class CustomAwaitable<T> { private CustomAwaiter<T> _awaiter = new CustomAwaiter<T>(); public CustomAwaiter<T> GetAwaiter() { Console.WriteLine("CustomAwaitable.GetAwaiter"); return _awaiter; } } ``` tn2>最后`CustomAwaiter`也做了一些修改 ```csharp public class CustomAwaiter<T> : ICriticalNotifyCompletion { private Action _continuation; private T _result; private Exception _exception; // AutoResetEvent autoEvent = new AutoResetEvent(false); public bool IsCompleted { get; private set; } public void OnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.OnCompleted"+$"continuation:{continuation.Target.GetType().Name}.{continuation.Method}"); _continuation = continuation; } public void UnsafeOnCompleted(Action continuation) { Console.WriteLine("CustomAwaiter.UnsafeOnCompleted" + $"continuation:{continuation.Target.GetType().Name}.{continuation.Method} {Thread.CurrentThread.ManagedThreadId}"); _continuation = continuation; _continuation.Invoke(); // autoEvent.Set(); } public T GetResult() { Console.WriteLine("CustomAwaiter.GetResult"); if (_exception != null) throw _exception; return _result; } public void SetResult(T result) { Console.WriteLine($"CustomAwaiter.SetResult {Thread.CurrentThread.ManagedThreadId}"); IsCompleted = true; _result = result; } public void SetException(Exception exception) { Console.WriteLine("CustomAwaiter.SetException"); IsCompleted = true; _exception = exception; } } ``` tn2>接着我们测试一下 ```csharp static async Task Main(string[] args) { var boo = await Boo(); Console.WriteLine(boo); Console.ReadLine(); } static async CustomAwaitable<string> Boo() { await Task.Run(() => Console.WriteLine("Finish")); string str = "bob"; return str; } ``` ![](https://img.tnblog.net/arcimg/hb/9df40436f032403fafc7b5e6fe76b939.png)