Appearance
异步编程
更新: 7/30/2025 字数: 0 字 时长: 0 分钟
异步编程的核心思想是“不要等待”。它是一种程序设计范式。通过允许程序在等待期间去处理其他任务,异步编程能够极大地提高应用的响应速度和资源利用效率,是构建流畅用户界面和高性能服务的基石。
现代C#异步编程选择 Task
,在99%的情况下都不需要使用 Thread
。
推荐视频:C#异步编程的入门概念及核心理念
与多线程的区别
需要明确的是,异步编程与多线程是不同的概念:
- 单线程同样可以异步
- 在 C# 中, 异步默认借助线程池。
- 多线程经常要求阻塞,而异步要求不阻塞。
多线程是实现异步的一种方式,但异步本身是一个更宽广、着眼于程序流程设计的编程模型。
此外,多线程和异步的适用场景不同:
- 多线程:适用于 CPU 密集型任务和长期运行的任务,提供了更底层的控制,但不易传参和返回值且书写代码较为繁琐。线程的创建与销毁开销较大。
- 异步:适用于 IO 密集型任务和大量短暂的小任务,且可以避免线程阻塞,挺高响应能力。
异步任务( Task
)
Task
是对于异步任务的抽象,它代表一个可能在未来某个时间点完成的操作。异步任务(默认)会借助线程池在其他线程上运行,在获取结果后回到之前的状态。
Task
是一个包含了异步任务各种状态的引用类型。它可以表示任务的执行状态、结果以及异常信息。
输入
c#
var task = new Task<string>(() =>
{
Thread.Sleep(1000);
return "done";
});
Console.WriteLine(task.Status);
task.Start();
Console.WriteLine(task.Status);
Thread.Sleep(1000);
Console.WriteLine(task.Status);
Thread.Sleep(1000);
Console.WriteLine(task.Status);
Console.WriteLine($"task result: {task.Result}");
输出
powershell
Created
WaitingToRun
Running
RanToCompletion
task result: done
返回值为 Task
的方法没有返回值,返回值为 Task<T>
有类型为 T
的返回值。
INFO
异步任务默认为后台线程。
异步方法( async
和 await
)
将方法标记为 async
后,可以在方法中使用 await
关键字。await
关键字会等待异步任务的结束,并获得结果。
c#
async Task Main()
{
await FooAsync();
}
Task FooAsync()
{
return Task.Delay(1000);
}
值得注意的是,await
会等待异步任务结束,再返回到调用处继续执行,但这不相当于阻塞线程。await
执行结束后,会使用一个新的空闲线程继续执行后续代码。但如果 await
的任务已经完成,则不会切换线程,直接继续执行后续代码。
输入
c#
// --- 注意:这是一个控制台应用程序 ---
// 主方法开始,打印当前线程ID
Helper.PrintThreadId("Befofore");
// 等待异步方法完成
await FooAsync();
// 异步方法完成后,打印当前线程ID
Helper.PrintThreadId("After");
async Task FooAsync()
{
// 异步方法开始,打印当前线程ID
Helper.PrintThreadId("Befofore");
await Task.Delay(1000);
// 延迟结束后,打印当前线程ID
Helper.PrintThreadId("After");
}
// 辅助类,用于打印线程ID和调用信息
class Helper
{
private static int _index = 1;
// 打印当前线程ID和调用方法名
public static void PrintThreadId(string? message = null, [CallerMemberName] string? name = null)
{
var title = $"{_index}: {name}";
if (!string.IsNullOrEmpty(message))
title += $" @ {message}";
// 输出:线程ID + 序号 + 方法名 + 消息
Console.WriteLine(Environment.CurrentManagedThreadId + "\t" + title);
Interlocked.Increment(ref _index);
}
}
输出:
powershell
1 1: <Main>$ @ Befofore
1 2: <Main>$ @ Befofore
10 3: <Main>$ @ After
10 4: <Main>$ @ After
通过输出可以看到,原先的线程(id = 1
)在 await
之后被挂起,新的线程(id = 10
)被用来继续执行后续代码。当然如果不是控制台应用程序,可能最后会返回到原先的线程,即主线程(ex:id = 1
)。
如果一个调用一个异步方法而不 await
,当异步方法结束时,会由原本的线程继续执行后续代码,而不是使用新的线程。
代码示例
输入:
c#
FooASync();
Console.WriteLine("Out " + Environment.CurrentManagedThreadId);
Thread.Sleep(1000);
Console.WriteLine(Environment.CurrentManagedThreadId);
async Task FooASync()
{
Console.WriteLine("Inner " + Environment.CurrentManagedThreadId);
await Task.Delay(500);
Console.WriteLine("Inner " + Environment.CurrentManagedThreadId);
}
输出:
powershell
Inner 1
Out 1
Inner 4
1
如果修改第一行为 await FooASync();
,又会有不一样的结果:
powershell
Inner 1
Inner 10
Out 10
10
可以观察到打印第二个 Inner 时,线程 id 还是 10,这是因为在 await Task.Delay(500)
之后,线程10又变回了线程池的空闲线程。
很有趣吧(`・ω・´)!
async
+ await
会将方法包装成状态机, await
类似于检查点。底层会调用 MoveNext
方法切换状态。
TIP
.NET 命名规范建议在所有异步方法名后添加 Async
后缀,以便于区分同步和异步方法。
但在接口方法中,我们无法为其添加 async
修饰符。
c#
public interface IFoo
{
// 接口方法中无法添加 async 修饰符
Task<int> GetValueAsync();
}
public class Demo : IFoo
{
public async Task<int> GetValueAsync()
{
// 或者: return 42 也是可以通过编译的,这是一个小小的语法糖
return await Task.FromResult(42);
}
}
当需要需要向一个事件注册异步方法的时候(ex: Action
),如果事件要求注册的委托无返回值,可以使用 async void
。在平时使用中,async void
应该尽量避免,因为它无法被 await
等待,也因而异常无法被捕获。
输入:
c#
try
{
await GetValueAsync();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
}
async Task<int> GetValueAsync()
{
await Task.Delay(1);
throw new Exception("发生错误");
}
输出:
powershell
发生错误
如果不进行 await
,则异常会在其线程被抛出,无法被捕获。 await
会自动拨开外部的聚合异常(AggregateException
),抛出原始异常。对于 async void
方法中可能会产生的异常,一定要谨慎处理。
异步的传染性(Contagious)
这个词听起来可能有点像负面的东西,但实际上它描述的是 async/await
设计模式的一个核心特征。
简单来说,一旦你决定在一个方法中使用 await
来进行异步等待,那么这个方法本身也必须被标记为 async
。接着,任何调用这个 async 方法并希望正确处理其结果的方法,最好也变成 async
方法来 await
它。
这个过程会像链式反应一样,从最底层的异步操作(例如文件读写、网络请求)开始,一直向上传播,直到调用堆栈的最高层。
不要强行“切断”传染(危险操作)
有些开发者不想让 async
一路传染上去,于是他们尝试使用一些方法来“同步地”等待一个异步任务完成。最常见的就是 Task.Wait()
和 Task.Result
。
这个操作其实很危险:
- 阻塞线程:
task.Wait()
和task.Result
会阻塞当前线程,直到异步任务完成。这完全违背了异步编程的初衷(即“不阻塞线程”)。如果你在 Unity 的主线程上这么做,你的游戏会直接卡死,和使用Thread.Sleep
的效果一样。 - 引发死锁 (Deadlock):这是一个非常经典且致命的问题。
- 想象一下在 Unity 主线程调用
async
方法:ProcessDataAsync().Result
。 - 主线程执行到
.Result
时,它会停下来等待ProcessDataAsync
完成。 ProcessDataAsync
内部的await
执行后,它需要回到原来的“上下文”(即主线程)来继续执行await
后面的代码。- 但此时主线程正被
.Result
阻塞着,它无法继续执行任何代码。 - 结果:
ProcessDataAsync
在等待主线程,而主线程在等待ProcessDataAsync
。双方都在等待对方,程序就永远卡住了——这就是死锁。
- 想象一下在 Unity 主线程调用
c#
async Task ProcessDataAsync()
{
// 模拟异步操作
await Task.Delay(1000);
Console.WriteLine("Data processed.");
}
void Main()
{
// 这会导致死锁
var result = ProcessDataAsync().Result;
Console.WriteLine(result);
}
结论:拥抱“传染性”
“异步的传染性”不是一个缺陷,而是 async/await 模式确保其非阻塞优势能够贯穿整个调用链的内在机制。
最佳实践是:
Async all the way up. (让异步一路到底)
重要思想:不阻塞
在这部分你也应该观察到了阻塞的问题,你不应该在异步方法使用任何方式阻塞当前线程。 await
可以释放当前线程,而阻塞则会导致线程无法继续执行其他任务,如果线程池里的线程被大量阻塞,可能会导致线程池耗尽。
导致卡死的代码示例:
c#
int HeavyJob()
{
// 这是一个耗时的同步操作
Thread.Sleep(5000);
return 42;
}
private void Button_Click(object sender, EventArgs e)
{
// 这会阻塞 UI 线程,导致界面卡死
var result = HeavyJob();
MessageBox.Show($"Result: {result}");
}
正确的做法:
c#
async Task<int> HeavyJob()
{
// 这是一个耗时的同步操作
Thread.Sleep(5000);
await Task.Delay(5000);
return 42;
}
private async void Button_Click(object sender, EventArgs e)
{
var result = await HeavyJob();
MessageBox.Show($"Result: {result}");
}
常见阻塞情景:
Task.Wait()
或Task.Result
:这些方法会阻塞当前线程,直到任务完成。但如果任务已经完成,则不会阻塞。Task.Delay()
vsThread.Sleep()
:Thread.Sleep()
会阻塞当前线程,而Task.Delay()
则不会。- IO 等操作的同步方法:一般都能寻找到这些方法的异步版本,如果实在找不到可以使用
Task.Run(() => )
进行包装。
同步上下文
你可以把 SynchronizationContext
想象成一个 “任务调度员”。
它的核心职责是提供一种机制,让你能够将一段代码(一个委托或一个工作项)排队,并确保它在 “正确” 的线程上执行。
这里的“正确”是一个关键词,它到底指哪个线程,完全取决于当前的运行环境:
- 在UI应用中 (如WPF, WinForms, Unity的主线程):“正确”的线程就是那个唯一的、负责更新UI界面的UI线程。
- 在ASP.NET Core应用中:“正确”的线程是处理当前HTTP请求的线程。
- 在控制台应用或普通后台线程中:默认情况下没有特定的“调度员”,任务会被安排到线程池 (Thread Pool) 中的任意一个可用线程上执行。
所以,SynchronizationContext
本身是一个抽象基类,不同的应用程序框架会提供自己的具体实现,来定义它们的“正确”线程是哪个以及如何向其调度任务。
一般只有 UI 线会采取这种策略程。Unity 有默认的同步上下文。
为什么需要同步上下文?
最主要的原因是线程安全,尤其是对于UI操作。
几乎所有的GUI框架(包括Unity的UI系统)都遵循一个严格的规则:只有创建UI控件的那个线程(即UI线程),才能修改这些控件。
可以通过调用 Task.ConfigureAwait()
配置任务在 await
方法结束是否会到原来的线程,默认为 true
。
c#
await Task.Delay(1000).ConfigureAwait(false);
TaskScheduler
也可以用来配置同步上下文。此外,还可以控制 Task
的调度方式和运行线程。这里只简单提及。
- 线程池线程:Default
- 当前线程:CurrentThread
- 单线程上下文:STAThread
- 长时间运行线程:LongRunning
一发即忘
一发即忘 “Fire-and-Forget” 是一种编程模式,指你启动一个操作(通常是异步任务),但不使用 await
或阻塞的方式去等待它的结束,无法观察任务的状态(是否完成,报错等)。例如:
输入
c#
class Program
{
static void Main()
{
// 使用弃元来压制警告
_ = FooAsync();
Console.WriteLine("Hello, World!");
Console.ReadLine();
}
static async Task FooAsync()
{
await Task.Delay(1);
GetSomeResult();
Console.WriteLine("Done");
}
static void GetSomeResult() => throw new Exception();
}
输出
powershell
Hello, World!
可见我们无法观察到 FooAsync
中的异常,编译器也无法帮助我们查出抛出异常之后的代码能否运行。
如果我们替换为 async void
,程序在运行时便会抛出异常,但无法捕获。例如:
输入
c#
class Program
{
static void Main()
{
try
{
FooAsync();
Console.WriteLine("Hello, World!");
Console.ReadLine();
}
catch (Exception e)
{
Console.WriteLine("Caught exception");
}
}
static async void FooAsync()
{
await Task.Delay(1);
GetSomeResult();
Console.WriteLine("Done");
}
static void GetSomeResult() => throw new Exception();
}
输出
powershell
Unhandled exception. Hello, World!
System.Exception: Exception of type 'System.Exception' was thrown.
at Program.GetSomeResult() in H:\Learn_Project\CSharp\AsynchronousProgramming\AsynchronousProgramming\AsyStatus.cs:line 27
at Program.FooAsync() in H:\Learn_Project\CSharp\AsynchronousProgramming\AsynchronousProgramming\AsyStatus.cs:line 23
at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
为何会产生这种差异
当一个异步方法声明为 async Task
时,它会返回一个 Task
对象。这个 Task
对象代表了整个异步操作。
- 异常捕获: 在这个方法内部抛出的任何异常都会被捕获并存储在返回的
Task
对象中。这个Task
的状态会变为 Faulted (故障)。 - 异常传播: 这个异常不会立即导致程序崩溃。只有当代码
await
这个Task
时,或者显式地访问其Exception
属性时,异常才会被重新抛出。
当一个异步方法声明为 async void
时,它不会返回 Task
。
- 异常无法捕获: 由于没有
Task
对象返回,在这个方法中抛出的异常无法被常规的try-catch
块捕获(除非try-catch
就在throw
语句的周围)。 - 异常传播:
async void
方法中未处理的异常会直接被抛到启动它的同步上下文(SynchronizationContext
)上。在控制台应用程序中,默认没有同步上下文,所以异常会被直接抛到线程池上。
创建异步任务
Task.Run
c#
var res = await Task.Run(HeavyJob);
int HeavyJob()
{
Thread.Sleep(5000);
return 42;
}
Task.Factory.StartNew
相当于 Task.Run
的完整版,提供了更多的配置选项,比如 TaskCreationOptions
和 TaskScheduler
。
c#
var res = await Task.Factory.StartNew(HeavyJob,
CancellationToken.None,
TaskCreationOptions.None,
TaskScheduler.Default);
new Task
+ Task.Start()
这种方式没什么必要,一般没有情况需要创建一个 Task
但是不立刻启动。
同时开启多个异步任务
c#
// 创建一个包含1到10的数组
var inputs = Enumerable.Range(1, 10);
var tasks = new List<Task<int>>();
// 为每个输入值创建一个异步任务,但不等待它们完成
foreach (var input in inputs)
{
// 注意:这里没有使用 await,所以所有任务会同时开始执行
tasks.Add(HeavyJobAsync(input));
}
// 等待所有任务完成,并获取所有结果
// Task.WhenAll 会并行等待所有任务,而不是顺序等待
var results = await Task.WhenAll(tasks);
// 模拟耗时的异步操作
async Task<int> HeavyJobAsync(int input)
{
await Task.Delay(1000);
return input * input;
}
除了 Task.WhenAll
,还有 Task.WhenAny
来等待任意一个任务完成。
取消异步任务
通过 CancellationTokenSource
+ CancellationToken
可以实现异步任务的取消:
输入
c#
CancellationTokenSource cts = new CancellationTokenSource();
try
{
await Task.WhenAll(
Task.Delay(10000, cts.Token),
Task.Run(() => cts.Cancel())
);
}
catch (TaskCanceledException e)
{
Console.WriteLine(e.Message);
}
finally
{
cts.Dispose();
}
输出
powershell
A task was canceled.
INFO
CTS
实现了 IDisposable
接口,使用后需要释放。
可以通过传入 TimeSpan
或 CancellationToken
可以设置取消的时间点:
c#
// 或者:var cts = new CancellationTokenSource(5000);
var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var cts2 = new CancellationTokenSource();
cts2.CancelAfter(5000);
TIP
推荐所有异步方法都传入 CancellationToken
参数。
编写支持传入 Token 的方法
通过在声明的异步方法中加入 CancellationToken
参数,并在方法体内 await
的异步方法中传入 CancellationToken
,可以使我们行云流水地编写可取消的异步方法:
c#
async Task FooAsync(CancellationToken cancellationToken)
{
await Task.Delay(5000, cancellationToken);
var client = new HttpClient();
await client.GetStreamAsync("`123123", cancellationToken);
}
但有时候,我们调用这些异步任务但完全没有取消的需求,如果每次都传入 CancellationToken
,会显得很繁琐。
通常来说,有两种解决方案:
- 方法重载c#
async Task FooAsync(CancellationToken cancellationToken) { await Task.Delay(5000, cancellationToken); var client = new HttpClient(); await client.GetStreamAsync("`123123", cancellationToken); } async Task FooAsync() => await FooAsync(CancellationToken.None);
- 默认参数c#
async Task FooAsync(CancellationToken cancellationToken = default) { await Task.Delay(5000, cancellationToken); var client = new HttpClient(); await client.GetStreamAsync("`123123", cancellationToken); }
在同步方法的上下文使用 Token
有时候,我们有一个耗时的同步方法。这个同步方法不容易被改写为异步方法,我们先使用 Task.Run
包装它,至少保证线程的不阻塞。在这种情况下,为这种操作添加取消功能,关键在于让那个耗时的同步方法能够“感知”到外部的取消请求。
方案一:修改同步方法
这是最理想、最有效的方案。你需要稍微修改一下那个耗时的同步方法,让它能接收一个 CancellationToken
作为参数。
c#
Task FooAsync(CancellationToken cancellationToken)
{
return Task.Run(() =>
{
// 在同步方法中检查取消请求
cancellationToken.ThrowIfCancellationRequested();
while(true)
{
// 在每次循环中检查取消请求
cancellationToken.ThrowIfCancellationRequested();
// 模拟耗时的同步操作
Thread.Sleep(5000);
}
});
}
方案二:在同步方法内部周期性地检查 Token
一般来说,不存在真的无法修改的同步方法,但如果你确实需要在一个无法修改的同步方法中使用取消功能,可以在调用这个同步方法的代码中周期性地检查 CancellationToken
。
c#
async Task BarAsync(CancellationToken cancellationToken)
{
await Task.Run(() =>
{
cancellationToken.ThrowIfCancellationRequested();
Thread.Sleep(5000); // 模拟耗时的同步操作
cancellationToken.ThrowIfCancellationRequested();
// ..
});
}
值得注意的是,直接给 Task.Run()
传入 CancellationToken
相当于在任务开始的时候检查 Token
c#
Task.Run(() =>
{
// ..
}, cancellationToken);
// 相当于:
Task.Run(() =>
{
cancellationToken.ThrowIfCancellationRequested();
// ..
});
任务取消的对策
- 抛出异常(推荐)c#
cancellationToken.ThrowIfCancellationRequested();
- 提前返回
- 可以通过
Task.FromCanceled<>()
返回一个比较有意义的Task
:
c#if (cancellationToken.IsCancellationRequested) return Task.FromCanceled<int>(cancellationToken);
- 可以通过
- 善后
try
,catch
,finally
cancellationToken.Register()
注册取消时的回调:
c#cancellationToken.Register(() => { // 取消时的清理工作 Console.WriteLine("Task was cancelled."); });
TIP
cancellationToken.Register()
后注册的委托先调用。
TaskCanceledException
& OperationCanceledException
OperationCanceledException
其实是 TaskCanceledException
的父类。OperationCanceledException
位于 System.Threading
中,而Task
位于命名空间 System.Threading.Tasks
。
其实,CancellationToken
也位于 System.Threading
命名空间中。似乎是因为 CancellationToken
其实不只能用于 Task
异步编程中,在一般多线程编程中也开用来取消线程。
方法 ThrowIfCancellationRequested()
会抛出 OperationCanceledException
,在日常的使用中,我们也直接抛出 OperationCanceledException
即可。
超时机制
有时需要预防程序因等待一个永远不会完成的任务而无限期卡死。
较早的方案中通过 Task.WhenAny()
和 Task.Delay()
实现超时机制:
输入
c#
using var cts = new CancellationTokenSource();
var fooTask = FooAsync(cts.Token);
// 同时等待目标任务和超时任务,看哪个先完成
var completedTask = await Task.WhenAny(fooTask, Task.Delay(1000));
// 如果完成的不是目标任务,说明超时了
if (completedTask != fooTask)
{
cts.Cancel();
// 等待目标任务真正结束(处理取消异常)
await fooTask;
}
/// <summary>
/// 模拟一个耗时的异步操作
/// </summary>
async Task FooAsync(CancellationToken token)
{
try
{
Console.WriteLine("Starting FooAsync...");
await Task.Delay(5000, token);
Console.WriteLine("Completed FooAsync...");
}
catch (OperationCanceledException)
{
// 捕获取消异常并处理
Console.WriteLine("Timeout");
}
}
输出
powershell
Starting FooAsync...
Timeout
在 .NET 6 以后,可以使用更为现代的方法 - 通过调用 Task.WaitAsync()
来实现超时机制。 Task.WaitAsync()
接受一个 TimeSpan
参数,在超时后会抛出 TimeoutException
。
输入
c#
using var cts = new CancellationTokenSource();
try
{
// 使用 WaitAsync 设置2秒超时,超时会抛出 TimeoutException
await FooAsync(cts.Token).WaitAsync(TimeSpan.FromSeconds(2));
}
catch (TimeoutException)
{
// 捕获超时异常
Console.WriteLine("Timeout");
cts.Cancel();
}
/// <summary>
/// 模拟一个耗时的异步操作
/// </summary>
async Task FooAsync(CancellationToken token)
{
try
{
Console.WriteLine("Starting FooAsync...");
await Task.Delay(5000, token);
Console.WriteLine("Completed FooAsync...");
}
catch (OperationCanceledException)
{
// 捕获取消异常并处理
Console.WriteLine("Task was canceled.");
}
}
更为基础的方法:CancellationTokenSource.CancelAfter()
此外还有一个更为经典基础方法,使用 CancellationTokenSource.CancelAfter()
。它的核心思想是让 CancellationTokenSource
自身成为一个计时器,在指定时间后自动发出取消信号。
你会注意到这种方法难以区分任务超时与任务取消,目前还是更加推荐使用 Task.WaitAsync()
方法。
c#
using var cts = new CancellationTokenSource();
try
{
// 直接让 CancellationTokenSource 在2秒后发出取消信号
cts.CancelAfter(TimeSpan.FromSeconds(2));
await FooAsync(cts.Token);
}
catch (OperationCanceledException)
{
// 当 CancelAfter 触发时,await 的任务会抛出此异常
Console.WriteLine("Task was canceled due to the timeout.");
}
/// <summary>
/// 模拟一个耗时的异步操作
/// </summary>
async Task FooAsync(CancellationToken token)
{
// 这个方法与前面的例子完全相同
try
{
Console.WriteLine("Starting FooAsync...");
await Task.Delay(5000, token);
Console.WriteLine("Completed FooAsync...");
}
catch (OperationCanceledException)
{
Console.WriteLine("Inner OperationCanceledException caught.");
// 重新抛出,让调用者知道任务已被取消
throw;
}
}
同步机制
之前多线程介绍的传统的同步方法(lock
, Mutex
, Semaphore
, EventWaitHandle
)都不适用于异步编程,因为它们都会导致阻塞,而异步编程最重要的就是不阻塞。
SemaphoreSlim
原生方法几乎只有一个适用的:SemaphoreSlim
。
c#
var inputs = Enumerable.Range(1, 10);
// 创建信号量,最多允许1个线程同时访问(相当于互斥锁)
var sem = new SemaphoreSlim(1, 1);
var tasks = new List<Task<int>>();
foreach (var input in inputs)
{
tasks.Add(HeavyJobAsync(input));
}
// 等待所有任务完成
var results = await Task.WhenAll(tasks);
// 模拟耗时的异步操作
async Task<int> HeavyJobAsync(int input)
{
// 异步等待获取信号量许可
await sem.WaitAsync();
try
{
await Task.Delay(1000);
return input * input;
}
finally
{
// 释放信号量许可
sem.Release();
}
}
TIP
maxCount
的核心作用是为信号量的计数值设置一个不可逾越的上限。
如果代码有bug,意外地调用 Release()
的次数比调用 WaitAsync()
的次数还多,会发生什么?
如果没有 maxCount
的限制:信号量的计数值会无限增长。通过 maxCount
的限制,当你调用 Release()
导致计数值试图超过 maxCount
时,SemaphoreSlim
会立即抛出一个SemaphoreFullException
异常。
此外,还可以是实现“懒加载资源池”等高级模式,此处先不展开。
第三方库还提供了更多的异步同步机制,比如 AsyncLock
、AsyncManualResetEvent
等。
AsyncLock
来自 Nito.AsyncEx
库的 AsyncLock
是一个异步版本的互斥锁。它允许多个异步任务安全地访问共享资源。例如:
c#
private readonly AsyncLock _lock = new();
public async Task DoJobAsync()
{
using (await _lock.LockAsync())
{
}
}
AsyncLock
结束了 using
的语法糖会自动释放锁,并没有实际释放什么资源。此外,AsyncLock
还可以接受一个 CancellationToken
,允许在等待锁的过程中取消操作。
AsyncLock
还保留一个同步的方法:
c#
using (_lock.Lock()) {}
AsyncAutoResetEvent
Nito.AsyncEx
和 Microsoft.VisualStudio.Threading
都提供了 AsyncAutoResetEvent
。如果我们不希望使用功能过于强大的 SemaphoreSlim
,我们只希望有一个线程可以访问共享资源,可以使用 AsyncAutoResetEvent
。例如:
c#
// 创建一个异步自动重置事件,默认未设置信号
var signal = new AsyncAutoResetEvent(false);
var setter = Task.Run((() =>
{
Thread.Sleep(1000);
// 设置信号,允许等待的任务继续执行
signal.Set();
}));
var waiter = Task.Run(async () =>
{
// 等待信号
await signal.WaitAsync();
Console.WriteLine("Signal received!");
});
await Task.WhenAll(setter, waiter);
简单来说,AsyncAutoResetEvent
中的 "AutoReset"(自动重置) 指的是:门默认是关闭,当 event.Set()
被调用的时候门会打开,排在门前的第一个任务(await event.WaitAsync()
)会被允许通过。当第一个任务通过后,门会自动关闭,等待下一个任务。
原生的 TaskCompletionSource
AsyncAutoResetEvent
可以被重复利用,但有时我们需要一个一次性的信号,此时可以考虑使用 TaskCompletionSource
。TaskCompletionSource
可以用来创建一个可以手动设置结果的任务,也有泛型版本。例如:
c#
var tcs = new TaskCompletionSource();
var setter = Task.Run((() =>
{
Thread.Sleep(1000);
// 设置结果,允许等待的任务继续执行
tcs.SetResult();
}));
var waiter = Task.Run(async () =>
{
// 等待任务完成
await tcs.Task;
Console.WriteLine("Signal received!");
});
await Task.WhenAll(setter, waiter);
如果有多个地方能够设置结果,继续使用 cs.SetResult()
会抛出异常。可以使用 TrySetResult()
来避免异常。例如:
c#
if (!tcs.TrySetResult())
{
Console.WriteLine("Task was already completed.");
}
传统的 await
在异步编程中,传统的 await
关键字也可以用来实现同步机制。它会暂停当前方法的执行,直到被等待的任务完成。这种方式虽然简单,但并不推荐用于复杂的同步场景,因为它可能导致性能问题和死锁。
c#
var setter = Task.Run((() =>
{
Thread.Sleep(1000);
}));
var waiter = Task.Run(async () =>
{
// 等待任务完成
await setter;
Console.WriteLine("Signal received!");
});
在传统的多线程编程中,我们很难观察到 Thread
的状态,但在异步编程中,我们可以通过 Task
的状态来观察任务的执行情况。这也是异步编程的一个重要优势,这个同步技巧的核心。
TIP
由于我们可以直接通过 Task
的状态来观察任务的状态,.NET 原生只提供了 SemaphoreSlim
和 TaskCompletionSource
。
如果想用工具包的话,建议在你希望锁能 AutoRest 的情况并且只有开关两种状态(AsyncAutoResetEvent
),或者希望一把一个锁同时用在同步和异步方法里(AsyncLock
)这两种需求下使用。否则平时原生的方法很多时候已经足够使用。
异步任务的通信 - Channel
集合
在多线程中的阻塞集合(BlockingCollection
)不可以使用,因为它们是阻塞的,违背了异步编程不阻塞的原则。
在 Channel 出现之前,要实现生产者-消费者模式,开发者通常需要手动组合一些同步原语,比如:
- 用
ConcurrentQueue
作为缓冲区。 - 用
SemaphoreSlim
来限制队列大小(实现“背压”)和在队列为空时通知消费者等待。 - 用大量的
lock
和复杂的逻辑来确保线程安全。 这样的代码不仅复杂、容易出错,而且性能调优也很困难。
Channel
的出现就是为了解决这个问题,它提供了:
- 简洁的 API:将所有复杂的同步逻辑都封装好了。
- 异步优先:专为
async/await
设计,不会阻塞线程,而是异步地等待,资源利用率极高。 - 高性能:底层实现经过了高度优化。
可以把 Channel
当做一个自带信号量的 ConcurrentQueue
。
获取 Chanel
实例
通过 Channel
的工厂方法获得一个无边界的 Channel
实例:
c#
var chanel = Channel.CreateUnbounded<int>();
也可以以同样的方式获得一个有边界的 Channel
实例,需要传递一个容量参数:
c#
// 容量为10
var chanel = Channel.CreateBounded<int>(10);
自定义
通过工厂方法创建的 Channel
可以接受一个 Option
参数,可以进行更多设置,BoundedChannelOptions
相比 UnboundedChannelOptions
提供了更多的配置选项。例如:
c#
var option = new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true,
SingleWriter = true
};
配置选项:
Capacity
:容量。FullMode
:枚举,表示当Channel
满时的行为。Wait
:等待直到有空间可用。DropWrite
:丢弃写入操作。DropOldest
:丢弃最旧的元素。DropNewest
:丢弃最新的元素。
SingleReader
:布尔值,表示是否只允许一个读取器。SingleWriter
:布尔值,表示是否只允许一个写入器。AllowSynchronousContinuations
:一般不常用,只有在边界情况,优化到无可优化的时候可以考虑,存在难以预料的风险。当Channel
为空时,消费者可以直接尝试从生产者获取元素。
写入与读取
Chanel
内部维护了一个 ConcurrentQueue
,可以通过 Writer
和 Reader
进行写入和读取。
以下介绍一个生产者-消费者模式(Producer-Consumer Pattern)的示例。在这个示例中,生产者通过 writer.WriteAsync()
方法将数据写入 Channel
,而消费者通过 reader.ReadAsync()
方法从 Channel
中读取数据:
c#
var chanel = Channel.CreateUnbounded<int>();
// 启动生产者和消费者
var sender = SendAsync(1, chanel.Writer);
var receiver = ReceiveAsync(2, chanel.Reader);
// 等待发送任务完成
await sender;
// 标记 Channel 写入完成,不再接收新数据
chanel.Writer.Complete();
Console.WriteLine("Channel writer completed.");
// 等待接收任务完成
await receiver;
// 生产者:发送数据到 Channel
async Task SendAsync(int id, ChannelWriter<int> writer)
{
try
{
for (int i = 1; i < 21; i++)
{
// 异步写入数据到 Channel
await writer.WriteAsync(i);
Console.WriteLine($"Thread {id} sent {i}");
await Task.Delay(20);
}
}
catch (OperationCanceledException)
{
Console.WriteLine($"Thread {id} was canceled.");
}
}
// 消费者:从 Channel 读取数据
async Task ReceiveAsync(int id, ChannelReader<int> reader)
{
try
{
// 持续读取直到 Channel 完成
while (!reader.Completion.IsCompleted)
{
// 异步读取数据
int result = await reader.ReadAsync();
Console.WriteLine($"Thread {id} received {result}");
await Task.Delay(20);
}
Console.WriteLine($"Thread {id} completed reading.");
}
catch (ChannelClosedException)
{
// Channel 被关闭时抛出此异常
Console.WriteLine($"Thread {id} was canceled. Because the channel was closed.");
}
}
当 writer
完成写入后,调用 chanel.Writer.Complete()
来标记 Channel
不再接收新数据。当 writer
关闭,Channel
为空的时候,Channel
就会自动关闭。如果此时再尝试读取数据,就会抛出 ChannelClosedException
。
INFO
即使在 while
循环里检查了 reader
的关闭,依旧需要在读取数据时使用 try-catch
来捕获 ChannelClosedException
异常。这是因为 while
里对 reader
的检查和 writer
的关闭可能会刚好错开,导致在 ReadAsync()
时 Channel
已经被关闭。
C# 8.0 带来了一个新的方法:reader.reader.ReadAllAsync
,可以代替以前的 reader.ReadAsync()
。
这个方法会返回一个 IAsyncEnumerable
。想要遍历实现了这个接口的类,需要在 foreach
先加上 await
关键词,例如:
c#
async Task ReceiveAsync(int id, ChannelReader<int> reader)
{
await foreach (var num in reader.ReadAllAsync())
{
Console.WriteLine($"Thread {id} received {num}");
}
}
通过这种方法甚至不需要产生去捕获异常。 其源代码也很简洁优雅。
ReadAllAsync()
源码
c#
public virtual async IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (TryRead(out T? item))
{
yield return item;
}
}
}
在同步方法中调用异步方法
总的原则是:“异步应该贯穿到底”(Async all the way down)。然而,在现实世界中,我们总会遇到一些无法遵循这个理想原则的情况。常见的情形有:
- 在构造函数里异步
- 接口规定的方法是同步的
- 兼容旧代码或第三方库
阻塞
阻塞的方法很简单,但可能导致死锁。
c#
FooAsync().Wait();
var message = GetMessageAsync().result;
使用 GetAwaiter().GetResult()
相比上面两种方法更安全一些,因为它不会抛出 AggregateException
,而是直接抛出原始异常。
c#
FooAsync().GetAwaiter().GetResult();
TIP
如果一定要在同步方法中阻塞地调用异步方法,GetAwaiter().GetResult()
应该是最优的方案了。
一发即忘
通过一发即忘的方式来调用异步方法,会导致无法捕获异常。之前已经介绍过一发即忘(Fire-and-Forget),这里不再赘述。
既然阻塞的方式不行,那也只能通过一发即忘的方式来调用异步方法。接下来介绍的方案会基于一发即忘,并尝试解决其缺陷。
扩展方法
这个方案来自 Brian Lagunas,也是 Prism 框架的负责人。
假设这么一个场景,我们需要在一个类的构造函数里去调用异步方法,如果此时用到阻塞的方法就不太合适,这会导致每次类的实例化都要卡几秒钟。
我们需要一种机制,让我们可以在一发即忘的情况下,得知异步方法的执行结果与是否发生异常。为此,我们可以设计一个方法 SafeFireAndForget
, 它接受一个异步任务和两个回调函数:一个用于任务完成时的处理,另一个用于异常处理。
c#
class Program
{
public static void Main(string[] args)
{
// 创建 DataModel 实例,构造函数会触发异步数据加载
var dataModel = new DataModel();
Console.WriteLine("Data loading started...");
// 模拟主线程继续执行其他任务
Thread.Sleep(2000);
// 检查数据是否已加载完成
var data = dataModel.Data;
Console.WriteLine($"Data loaded: {data?.Count ?? 0} items");
}
}
class DataModel
{
public List<int>? Data {get; private set;}
public bool IsDataLoaded { get; private set; }
public DataModel()
{
// 在构造函数中使用 SafeFireAndForget 启动异步数据加载
// 传入完成回调和错误处理回调
SafeFireAndForget(
LoadDataAsync(),
() => IsDataLoaded = true,
e => Console.WriteLine("Error loading data: " + e.Message)
);
}
/// <summary>
/// 安全的一发即忘方法:执行异步任务但不等待结果,同时提供异常处理
/// </summary>
/// <param name="task">要执行的异步任务</param>
/// <param name="onCompleted">任务完成时的回调</param>
/// <param name="onError">发生异常时的回调</param>
static async void SafeFireAndForget(Task task, Action? onCompleted = null, Action<Exception>? onError = null)
{
try
{
// 等待任务完成
await task;
// 任务成功完成,调用完成回调
onCompleted?.Invoke();
}
catch (Exception e)
{
// 捕获异常并调用错误处理回调
onError?.Invoke(e);
}
}
/// <summary>
/// 模拟异步加载数据的方法
/// </summary>
private async Task LoadDataAsync()
{
await Task.Delay(1000);
Data = Enumerable.Range(1, 10).ToList();
}
}
通过给 Task
添加扩展方法,我们能更优雅的实现和这个功能:
c#
class DataModel
{
public List<int>? Data {get; private set;}
public bool IsDataLoaded { get; private set; }
public DataModel()
{
// 使用扩展方法启动异步数据加载,提供完成和错误回调
LoadDataAsync().Await(() => IsDataLoaded = true,
e => Console.WriteLine("Error loading data: " + e.Message));
}
private async Task LoadDataAsync()
{
await Task.Delay(1000);
Console.WriteLine(Environment.CurrentManagedThreadId);
throw new Exception("Failed to load data");
Data = Enumerable.Range(1, 10).ToList();
}
}
/// <summary>
/// Task 扩展方法类,提供一发即忘功能
/// </summary>
public static class TaskExtensions
{
/// <summary>
/// 异步等待任务完成的扩展方法,支持完成和错误回调
/// 方法名也可叫做 Forget,都是业界约定俗成的名称
/// </summary>
/// <param name="task">要执行的异步任务</param>
/// <param name="onCompleted">任务成功完成时的回调</param>
/// <param name="onError">任务发生异常时的回调</param>
public static async void Await(this Task task, Action? onCompleted = null, Action<Exception>? onError = null)
{
try
{
await task;
onCompleted?.Invoke();
}
catch (Exception e)
{
onError?.Invoke(e);
}
}
}
ContinueWith()
接下来的方案来自 SingletonSean,这种方法使用了原生的 Task.ContinueWith
方法来处理异步任务的结果和异常:
c#
class DataModel
{
public List<int>? Data {get; private set;}
public bool IsDataLoaded { get; private set; }
public DataModel()
{
LoadDataAsync().ContinueWith(OnDataLoaded);
}
private bool OnDataLoaded(Task task)
{
if (task.IsFaulted)
{
Console.WriteLine("Error loading data: " + task.Exception?.GetBaseException().Message);
return false;
}
return IsDataLoaded = true;
}
private async Task LoadDataAsync()
{
await Task.Delay(1000);
Data = Enumerable.Range(1, 10).ToList();
}
}
WARNING
这种方法明显更加简洁,且使用原生方法避免了自己造轮子。但这并不意味着这种方法优于 Brian 的方案。
ContinueWith
会将你的委托包装成一个Task
,这会导致一些额外的开销。ContinueWith
的TaskScheduler
的是TaskScheduler.Current
而不是TaskScheduler.Default
。这意味着如果你在 UI 线程上调用ContinueWith
,它会在 UI 线程上执行回调,这可能导致死锁或其他问题。
异步工厂
最后的方案来自 Nick Chapsas。
c#
class MyService
{
private MyService()
{
}
private async Task InitData()
{
await Task.Delay(1000);
}
public static async Task<MyService> CreateAsync()
{
var service = new MyService();
await service.InitData();
return service;
}
}
这个方法也非常简单,但也存在一些弊端:
- 难以实现单例。
- 没有办法把这个类注册给一个 IOC 容器。
补充
推荐视频:C#如何在一个同步方法中调用异步方法
- 如果想实现单例 -
AsyncLazy
:来自Nito.AsyncEx
工具包和Microsoft.VisualStudio.Threading
工具包。 JoinableTaskFactory
:来自Microsoft.VisualStudio.Threading
工具包。- 注册 IOC 容器 -
Microsoft.Extensions.DependencyInjection
工具包。 - Unit Test
INFO
就结论来看,最好还是能异步就异步,能贯穿到底就贯穿到底。