Skip to content

异步编程

更新: 7/30/2025 字数: 0 字 时长: 0 分钟

异步编程的核心思想是“不要等待”。它是一种程序设计范式。通过允许程序在等待期间去处理其他任务,异步编程能够极大地提高应用的响应速度和资源利用效率,是构建流畅用户界面和高性能服务的基石。

现代C#异步编程选择 Task,在99%的情况下都不需要使用 Thread

推荐视频:C#异步编程的入门概念及核心理念

与多线程的区别

需要明确的是,异步编程与多线程是不同的概念:

  1. 单线程同样可以异步
  2. 在 C# 中, 异步默认借助线程池。
  3. 多线程经常要求阻塞,而异步要求不阻塞。

多线程是实现异步的一种方式,但异步本身是一个更宽广、着眼于程序流程设计的编程模型。

此外,多线程和异步的适用场景不同:

  • 多线程:适用于 CPU 密集型任务和长期运行的任务,提供了更底层的控制,但不易传参和返回值且书写代码较为繁琐。线程的创建与销毁开销较大。
  • 异步:适用于 IO 密集型任务和大量短暂的小任务,且可以避免线程阻塞,挺高响应能力。

异步任务( Task

Task 是对于异步任务的抽象,它代表一个可能在未来某个时间点完成的操作。异步任务(默认)会借助线程池在其他线程上运行,在获取结果后回到之前的状态。

Task 是一个包含了异步任务各种状态的引用类型。它可以表示任务的执行状态、结果以及异常信息。

输入

c#
var task = new Task<string>(() =>
{
	Thread.Sleep(1000);
	return "done";
});

Console.WriteLine(task.Status);
task.Start();
Console.WriteLine(task.Status);

Thread.Sleep(1000);
Console.WriteLine(task.Status);

Thread.Sleep(1000);
Console.WriteLine(task.Status);
Console.WriteLine($"task result: {task.Result}");

输出

powershell
Created
WaitingToRun
Running
RanToCompletion
task result: done

返回值为 Task 的方法没有返回值,返回值为 Task<T> 有类型为 T 的返回值。

INFO

异步任务默认为后台线程。

异步方法( asyncawait

将方法标记为 async 后,可以在方法中使用 await 关键字。await 关键字会等待异步任务的结束,并获得结果。

c#
async Task Main() 
{
    await FooAsync();
}

Task FooAsync() 
{
    return Task.Delay(1000);
}

值得注意的是,await 会等待异步任务结束,再返回到调用处继续执行,但这不相当于阻塞线程。await 执行结束后,会使用一个新的空闲线程继续执行后续代码。但如果 await 的任务已经完成,则不会切换线程,直接继续执行后续代码。

输入

c#
// --- 注意:这是一个控制台应用程序 ---

// 主方法开始,打印当前线程ID
Helper.PrintThreadId("Befofore");
// 等待异步方法完成
await FooAsync();
// 异步方法完成后,打印当前线程ID
Helper.PrintThreadId("After");

async Task FooAsync()
{
	// 异步方法开始,打印当前线程ID
	Helper.PrintThreadId("Befofore");
	await Task.Delay(1000);
	// 延迟结束后,打印当前线程ID
	Helper.PrintThreadId("After");
}

// 辅助类,用于打印线程ID和调用信息
class Helper
{
	private static int _index = 1;

	// 打印当前线程ID和调用方法名
	public static void PrintThreadId(string? message = null, [CallerMemberName] string? name = null)
	{
		var title = $"{_index}: {name}";
		if (!string.IsNullOrEmpty(message))
			title += $" @ {message}";
		// 输出:线程ID + 序号 + 方法名 + 消息
		Console.WriteLine(Environment.CurrentManagedThreadId + "\t" + title);
		Interlocked.Increment(ref _index);
	}
}

输出:

powershell
1 1: <Main>$ @ Befofore
1 2: <Main>$ @ Befofore
10 3: <Main>$ @ After
10 4: <Main>$ @ After

通过输出可以看到,原先的线程(id = 1)在 await 之后被挂起,新的线程(id = 10)被用来继续执行后续代码。当然如果不是控制台应用程序,可能最后会返回到原先的线程,即主线程(ex:id = 1)。

如果一个调用一个异步方法而不 await,当异步方法结束时,会由原本的线程继续执行后续代码,而不是使用新的线程。

代码示例

输入:

c#
FooASync();
Console.WriteLine("Out " + Environment.CurrentManagedThreadId);
Thread.Sleep(1000);

Console.WriteLine(Environment.CurrentManagedThreadId);

async Task FooASync()
{
	Console.WriteLine("Inner " + Environment.CurrentManagedThreadId);
	await Task.Delay(500);
	Console.WriteLine("Inner " + Environment.CurrentManagedThreadId);
}

输出:

powershell
Inner 1
Out 1
Inner 4
1

如果修改第一行为 await FooASync();,又会有不一样的结果:

powershell
Inner 1
Inner 10
Out 10
10

可以观察到打印第二个 Inner 时,线程 id 还是 10,这是因为在 await Task.Delay(500) 之后,线程10又变回了线程池的空闲线程。

很有趣吧(`・ω・´)!

async + await 会将方法包装成状态机, await 类似于检查点。底层会调用 MoveNext 方法切换状态。

TIP

.NET 命名规范建议在所有异步方法名后添加 Async 后缀,以便于区分同步和异步方法。

但在接口方法中,我们无法为其添加 async 修饰符。

c#
public interface IFoo
{
	// 接口方法中无法添加 async 修饰符
	Task<int> GetValueAsync();
}

public class Demo : IFoo 
{
	public async Task<int> GetValueAsync()
	{
		// 或者: return 42 也是可以通过编译的,这是一个小小的语法糖
		return await Task.FromResult(42);
	}
}

当需要需要向一个事件注册异步方法的时候(ex: Action),如果事件要求注册的委托无返回值,可以使用 async void。在平时使用中,async void 应该尽量避免,因为它无法被 await 等待,也因而异常无法被捕获。

输入:

c#
try
{
	await GetValueAsync();
}
catch (Exception e)
{
	Console.WriteLine(e.Message);
}

async Task<int> GetValueAsync()
{
	await Task.Delay(1);
	throw new Exception("发生错误");
}

输出:

powershell
发生错误

如果不进行 await,则异常会在其线程被抛出,无法被捕获。 await 会自动拨开外部的聚合异常(AggregateException),抛出原始异常。对于 async void 方法中可能会产生的异常,一定要谨慎处理。

异步的传染性(Contagious)

这个词听起来可能有点像负面的东西,但实际上它描述的是 async/await 设计模式的一个核心特征。

简单来说,一旦你决定在一个方法中使用 await 来进行异步等待,那么这个方法本身也必须被标记为 async 。接着,任何调用这个 async 方法并希望正确处理其结果的方法,最好也变成 async 方法来 await 它。

这个过程会像链式反应一样,从最底层的异步操作(例如文件读写、网络请求)开始,一直向上传播,直到调用堆栈的最高层。

不要强行“切断”传染(危险操作)

有些开发者不想让 async 一路传染上去,于是他们尝试使用一些方法来“同步地”等待一个异步任务完成。最常见的就是 Task.Wait()Task.Result

这个操作其实很危险:

  1. 阻塞线程task.Wait()task.Result 会阻塞当前线程,直到异步任务完成。这完全违背了异步编程的初衷(即“不阻塞线程”)。如果你在 Unity 的主线程上这么做,你的游戏会直接卡死,和使用 Thread.Sleep 的效果一样。
  2. 引发死锁 (Deadlock):这是一个非常经典且致命的问题。
    • 想象一下在 Unity 主线程调用 async 方法:ProcessDataAsync().Result
    • 主线程执行到 .Result 时,它会停下来等待 ProcessDataAsync 完成。
    • ProcessDataAsync 内部的 await 执行后,它需要回到原来的“上下文”(即主线程)来继续执行 await 后面的代码。
    • 但此时主线程正被 .Result 阻塞着,它无法继续执行任何代码。
    • 结果:ProcessDataAsync 在等待主线程,而主线程在等待 ProcessDataAsync。双方都在等待对方,程序就永远卡住了——这就是死锁。
c#
async Task ProcessDataAsync()
{
	// 模拟异步操作
	await Task.Delay(1000);
	Console.WriteLine("Data processed.");
}

void Main()
{
	// 这会导致死锁
	var result = ProcessDataAsync().Result;
	Console.WriteLine(result);
}

结论:拥抱“传染性”

“异步的传染性”不是一个缺陷,而是 async/await 模式确保其非阻塞优势能够贯穿整个调用链的内在机制。

最佳实践是:

Async all the way up. (让异步一路到底)

重要思想:不阻塞

在这部分你也应该观察到了阻塞的问题,你不应该在异步方法使用任何方式阻塞当前线程。 await 可以释放当前线程,而阻塞则会导致线程无法继续执行其他任务,如果线程池里的线程被大量阻塞,可能会导致线程池耗尽。

导致卡死的代码示例:

c#
int HeavyJob()
{
	// 这是一个耗时的同步操作
	Thread.Sleep(5000);
	return 42;
}

private void Button_Click(object sender, EventArgs e)
{
	// 这会阻塞 UI 线程,导致界面卡死
	var result = HeavyJob();
	MessageBox.Show($"Result: {result}");
}

正确的做法:

c#
async Task<int> HeavyJob()
{
	// 这是一个耗时的同步操作
	Thread.Sleep(5000);
	await Task.Delay(5000);
	return 42;
}

private async void Button_Click(object sender, EventArgs e)
{
	var result = await HeavyJob();
	MessageBox.Show($"Result: {result}");
}

常见阻塞情景:

  • Task.Wait()Task.Result:这些方法会阻塞当前线程,直到任务完成。但如果任务已经完成,则不会阻塞。
  • Task.Delay() vs Thread.Sleep()Thread.Sleep() 会阻塞当前线程,而 Task.Delay() 则不会。
  • IO 等操作的同步方法:一般都能寻找到这些方法的异步版本,如果实在找不到可以使用 Task.Run(() => )进行包装。

同步上下文

你可以把 SynchronizationContext 想象成一个 “任务调度员”

它的核心职责是提供一种机制,让你能够将一段代码(一个委托或一个工作项)排队,并确保它在 “正确” 的线程上执行。

这里的“正确”是一个关键词,它到底指哪个线程,完全取决于当前的运行环境:

  • 在UI应用中 (如WPF, WinForms, Unity的主线程):“正确”的线程就是那个唯一的、负责更新UI界面的UI线程。
  • 在ASP.NET Core应用中:“正确”的线程是处理当前HTTP请求的线程。
  • 在控制台应用或普通后台线程中:默认情况下没有特定的“调度员”,任务会被安排到线程池 (Thread Pool) 中的任意一个可用线程上执行。

所以,SynchronizationContext 本身是一个抽象基类,不同的应用程序框架会提供自己的具体实现,来定义它们的“正确”线程是哪个以及如何向其调度任务。

一般只有 UI 线会采取这种策略程。Unity 有默认的同步上下文。

为什么需要同步上下文?

最主要的原因是线程安全,尤其是对于UI操作。

几乎所有的GUI框架(包括Unity的UI系统)都遵循一个严格的规则:只有创建UI控件的那个线程(即UI线程),才能修改这些控件。

可以通过调用 Task.ConfigureAwait() 配置任务在 await 方法结束是否会到原来的线程,默认为 true

c#
await Task.Delay(1000).ConfigureAwait(false);

TaskScheduler 也可以用来配置同步上下文。此外,还可以控制 Task 的调度方式和运行线程。这里只简单提及。

  1. 线程池线程:Default
  2. 当前线程:CurrentThread
  3. 单线程上下文:STAThread
  4. 长时间运行线程:LongRunning

一发即忘

一发即忘 “Fire-and-Forget” 是一种编程模式,指你启动一个操作(通常是异步任务),但不使用 await 或阻塞的方式去等待它的结束,无法观察任务的状态(是否完成,报错等)。例如:

输入

c#
class Program
{
	static void Main()
	{
		// 使用弃元来压制警告
		_ = FooAsync();
		Console.WriteLine("Hello, World!");
		Console.ReadLine();
	}
	
	static async Task FooAsync()
	{
		await Task.Delay(1);
		GetSomeResult();
		Console.WriteLine("Done");
	}

	static void GetSomeResult() => throw new Exception();
}

输出

powershell
Hello, World!

可见我们无法观察到 FooAsync 中的异常,编译器也无法帮助我们查出抛出异常之后的代码能否运行。

如果我们替换为 async void,程序在运行时便会抛出异常,但无法捕获。例如:

输入

c#
class Program
{
	static void Main()
	{
		try
		{
			FooAsync();
			Console.WriteLine("Hello, World!");
			Console.ReadLine();
		}
		catch (Exception e)
		{
			Console.WriteLine("Caught exception");
		}
	}
	
	static async void FooAsync()
	{
		await Task.Delay(1);
		GetSomeResult();
		Console.WriteLine("Done");
	}

	static void GetSomeResult() => throw new Exception();
}

输出

powershell
Unhandled exception. Hello, World!
System.Exception: Exception of type 'System.Exception' was thrown.
   at Program.GetSomeResult() in H:\Learn_Project\CSharp\AsynchronousProgramming\AsynchronousProgramming\AsyStatus.cs:line 27
   at Program.FooAsync() in H:\Learn_Project\CSharp\AsynchronousProgramming\AsynchronousProgramming\AsyStatus.cs:line 23
   at System.Threading.Tasks.Task.<>c.<ThrowAsync>b__128_1(Object state)
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading.PortableThreadPool.WorkerThread.WorkerThreadStart()
为何会产生这种差异

当一个异步方法声明为 async Task 时,它会返回一个 Task 对象。这个 Task 对象代表了整个异步操作。

  • 异常捕获: 在这个方法内部抛出的任何异常都会被捕获并存储在返回的 Task 对象中。这个 Task 的状态会变为 Faulted (故障)。
  • 异常传播: 这个异常不会立即导致程序崩溃。只有当代码 await 这个 Task 时,或者显式地访问其 Exception 属性时,异常才会被重新抛出。

当一个异步方法声明为 async void 时,它不会返回 Task

  • 异常无法捕获: 由于没有 Task 对象返回,在这个方法中抛出的异常无法被常规的 try-catch 块捕获(除非 try-catch 就在 throw 语句的周围)。
  • 异常传播: async void 方法中未处理的异常会直接被抛到启动它的同步上下文(SynchronizationContext)上。在控制台应用程序中,默认没有同步上下文,所以异常会被直接抛到线程池上。

创建异步任务

Task.Run

c#
var res = await Task.Run(HeavyJob); 

int HeavyJob() 
{
	Thread.Sleep(5000); 
	return 42;
}

Task.Factory.StartNew

相当于 Task.Run 的完整版,提供了更多的配置选项,比如 TaskCreationOptionsTaskScheduler

c#
var res = await Task.Factory.StartNew(HeavyJob, 
	CancellationToken.None, 
	TaskCreationOptions.None, 
	TaskScheduler.Default);

new Task + Task.Start()

这种方式没什么必要,一般没有情况需要创建一个 Task 但是不立刻启动。

同时开启多个异步任务

c#
// 创建一个包含1到10的数组
var inputs = Enumerable.Range(1, 10);

var tasks = new List<Task<int>>();

// 为每个输入值创建一个异步任务,但不等待它们完成
foreach (var input in inputs)
{
	// 注意:这里没有使用 await,所以所有任务会同时开始执行
	tasks.Add(HeavyJobAsync(input));
}

// 等待所有任务完成,并获取所有结果
// Task.WhenAll 会并行等待所有任务,而不是顺序等待
var results = await Task.WhenAll(tasks);

// 模拟耗时的异步操作
async Task<int> HeavyJobAsync(int input)
{
	await Task.Delay(1000); 
	return input * input;
}

除了 Task.WhenAll,还有 Task.WhenAny 来等待任意一个任务完成。

取消异步任务

通过 CancellationTokenSource + CancellationToken 可以实现异步任务的取消:

输入

c#
CancellationTokenSource cts = new CancellationTokenSource();

try
{
	await Task.WhenAll(
		Task.Delay(10000, cts.Token), 
		Task.Run(() => cts.Cancel())
	);
}
catch (TaskCanceledException e)
{
	Console.WriteLine(e.Message);
}
finally
{
	cts.Dispose();
}

输出

powershell
A task was canceled.

INFO

CTS 实现了 IDisposable 接口,使用后需要释放。

可以通过传入 TimeSpanCancellationToken 可以设置取消的时间点:

c#
// 或者:var cts = new CancellationTokenSource(5000);
var cts1 = new CancellationTokenSource(TimeSpan.FromSeconds(5));

var cts2 = new CancellationTokenSource();
cts2.CancelAfter(5000);

TIP

推荐所有异步方法都传入 CancellationToken 参数。

编写支持传入 Token 的方法

通过在声明的异步方法中加入 CancellationToken 参数,并在方法体内 await 的异步方法中传入 CancellationToken,可以使我们行云流水地编写可取消的异步方法:

c#
async Task FooAsync(CancellationToken cancellationToken)
{
	await Task.Delay(5000, cancellationToken);
	var client = new HttpClient();
	await client.GetStreamAsync("`123123", cancellationToken);
}

但有时候,我们调用这些异步任务但完全没有取消的需求,如果每次都传入 CancellationToken,会显得很繁琐。

通常来说,有两种解决方案:

  1. 方法重载
    c#
    async Task FooAsync(CancellationToken cancellationToken)
    {
    	await Task.Delay(5000, cancellationToken);
    	var client = new HttpClient();
    	await client.GetStreamAsync("`123123", cancellationToken);
    }
    
    async Task FooAsync() => await FooAsync(CancellationToken.None);
  2. 默认参数
    c#
    async Task FooAsync(CancellationToken cancellationToken = default)
    {
    	await Task.Delay(5000, cancellationToken);
    	var client = new HttpClient();
    	await client.GetStreamAsync("`123123", cancellationToken);
    }

在同步方法的上下文使用 Token

有时候,我们有一个耗时的同步方法。这个同步方法不容易被改写为异步方法,我们先使用 Task.Run 包装它,至少保证线程的不阻塞。在这种情况下,为这种操作添加取消功能,关键在于让那个耗时的同步方法能够“感知”到外部的取消请求。

方案一:修改同步方法

这是最理想、最有效的方案。你需要稍微修改一下那个耗时的同步方法,让它能接收一个 CancellationToken 作为参数。

c#
Task FooAsync(CancellationToken cancellationToken)
{
	return Task.Run(() => 
	{
		// 在同步方法中检查取消请求
		cancellationToken.ThrowIfCancellationRequested();

		while(true)
		{
			// 在每次循环中检查取消请求
			cancellationToken.ThrowIfCancellationRequested();

			// 模拟耗时的同步操作
			Thread.Sleep(5000);
		}
	});
}

方案二:在同步方法内部周期性地检查 Token

一般来说,不存在真的无法修改的同步方法,但如果你确实需要在一个无法修改的同步方法中使用取消功能,可以在调用这个同步方法的代码中周期性地检查 CancellationToken

c#
async Task BarAsync(CancellationToken cancellationToken)
{
	await Task.Run(() => 
	{
		
		cancellationToken.ThrowIfCancellationRequested();
		
		Thread.Sleep(5000); // 模拟耗时的同步操作

		cancellationToken.ThrowIfCancellationRequested();

		// ..
	});
}

值得注意的是,直接给 Task.Run() 传入 CancellationToken 相当于在任务开始的时候检查 Token

c#
Task.Run(() => 
{
	// ..
}, cancellationToken);

// 相当于:
Task.Run(() => 
{
	cancellationToken.ThrowIfCancellationRequested();
	// ..
});

任务取消的对策

  1. 抛出异常(推荐)
    c#
    cancellationToken.ThrowIfCancellationRequested();
  2. 提前返回
    • 可以通过 Task.FromCanceled<>() 返回一个比较有意义的 Task
    c#
    if (cancellationToken.IsCancellationRequested)
    	return Task.FromCanceled<int>(cancellationToken);
  3. 善后
    • try, catch, finally
    • cancellationToken.Register() 注册取消时的回调:
    c#
    cancellationToken.Register(() => 
    {
    	// 取消时的清理工作
    	Console.WriteLine("Task was cancelled.");
    });

TIP

cancellationToken.Register() 后注册的委托先调用。

TaskCanceledException & OperationCanceledException

OperationCanceledException 其实是 TaskCanceledException 的父类。OperationCanceledException 位于 System.Threading 中,而Task 位于命名空间 System.Threading.Tasks

其实,CancellationToken 也位于 System.Threading 命名空间中。似乎是因为 CancellationToken 其实不只能用于 Task 异步编程中,在一般多线程编程中也开用来取消线程。

方法 ThrowIfCancellationRequested() 会抛出 OperationCanceledException在日常的使用中,我们也直接抛出 OperationCanceledException 即可

超时机制

有时需要预防程序因等待一个永远不会完成的任务而无限期卡死。

较早的方案中通过 Task.WhenAny()Task.Delay() 实现超时机制:

输入

c#
using var cts = new CancellationTokenSource();
var fooTask = FooAsync(cts.Token);

// 同时等待目标任务和超时任务,看哪个先完成
var completedTask = await Task.WhenAny(fooTask, Task.Delay(1000));
// 如果完成的不是目标任务,说明超时了
if (completedTask != fooTask)
{
	cts.Cancel();
	// 等待目标任务真正结束(处理取消异常)
	await fooTask;
}

/// <summary>
/// 模拟一个耗时的异步操作
/// </summary>
async Task FooAsync(CancellationToken token)
{
	try
	{
		Console.WriteLine("Starting FooAsync...");
		await Task.Delay(5000, token);
		Console.WriteLine("Completed FooAsync...");
	}
	catch (OperationCanceledException)
	{
		// 捕获取消异常并处理
		Console.WriteLine("Timeout");
	}
}

输出

powershell
Starting FooAsync...
Timeout

在 .NET 6 以后,可以使用更为现代的方法 - 通过调用 Task.WaitAsync() 来实现超时机制。 Task.WaitAsync() 接受一个 TimeSpan 参数,在超时后会抛出 TimeoutException

输入

c#
using var cts = new CancellationTokenSource();

try
{
	// 使用 WaitAsync 设置2秒超时,超时会抛出 TimeoutException
	await FooAsync(cts.Token).WaitAsync(TimeSpan.FromSeconds(2)); 
}
catch (TimeoutException)
{
	// 捕获超时异常
	Console.WriteLine("Timeout");
	cts.Cancel();
}

/// <summary>
/// 模拟一个耗时的异步操作
/// </summary>
async Task FooAsync(CancellationToken token)
{
	try
	{
		Console.WriteLine("Starting FooAsync...");
		await Task.Delay(5000, token);
		Console.WriteLine("Completed FooAsync...");
	}
	catch (OperationCanceledException)
	{
		// 捕获取消异常并处理
		Console.WriteLine("Task was canceled.");
	}
}
更为基础的方法:CancellationTokenSource.CancelAfter()

此外还有一个更为经典基础方法,使用 CancellationTokenSource.CancelAfter()。它的核心思想是让 CancellationTokenSource 自身成为一个计时器,在指定时间后自动发出取消信号。

你会注意到这种方法难以区分任务超时与任务取消,目前还是更加推荐使用 Task.WaitAsync() 方法

c#
using var cts = new CancellationTokenSource();

try
{
    // 直接让 CancellationTokenSource 在2秒后发出取消信号
    cts.CancelAfter(TimeSpan.FromSeconds(2));
    await FooAsync(cts.Token);
}
catch (OperationCanceledException) 
{
    // 当 CancelAfter 触发时,await 的任务会抛出此异常
    Console.WriteLine("Task was canceled due to the timeout.");
}

/// <summary>
/// 模拟一个耗时的异步操作
/// </summary>
async Task FooAsync(CancellationToken token)
{
    // 这个方法与前面的例子完全相同
    try
    {
        Console.WriteLine("Starting FooAsync...");
        await Task.Delay(5000, token);
        Console.WriteLine("Completed FooAsync...");
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("Inner OperationCanceledException caught.");
		// 重新抛出,让调用者知道任务已被取消
        throw; 
    }
}

同步机制

之前多线程介绍的传统的同步方法(lock, Mutex, Semaphore, EventWaitHandle)都不适用于异步编程,因为它们都会导致阻塞,而异步编程最重要的就是不阻塞。

SemaphoreSlim

原生方法几乎只有一个适用的:SemaphoreSlim

c#
var inputs = Enumerable.Range(1, 10);

// 创建信号量,最多允许1个线程同时访问(相当于互斥锁)
var sem = new SemaphoreSlim(1, 1);

var tasks = new List<Task<int>>();

foreach (var input in inputs)
{
	tasks.Add(HeavyJobAsync(input));
}

// 等待所有任务完成
var results = await Task.WhenAll(tasks);

// 模拟耗时的异步操作
async Task<int> HeavyJobAsync(int input)
{
	// 异步等待获取信号量许可
	await sem.WaitAsync();
	try
	{
		await Task.Delay(1000); 
		return input * input;
	}
	finally
	{
		// 释放信号量许可
		sem.Release();
	}
}

TIP

maxCount 的核心作用是为信号量的计数值设置一个不可逾越的上限。

如果代码有bug,意外地调用 Release() 的次数比调用 WaitAsync() 的次数还多,会发生什么?

如果没有 maxCount 的限制:信号量的计数值会无限增长。通过 maxCount 的限制,当你调用 Release() 导致计数值试图超过 maxCount 时,SemaphoreSlim 会立即抛出一个SemaphoreFullException 异常。

此外,还可以是实现“懒加载资源池”等高级模式,此处先不展开。

第三方库还提供了更多的异步同步机制,比如 AsyncLockAsyncManualResetEvent 等。

AsyncLock

来自 Nito.AsyncEx 库的 AsyncLock 是一个异步版本的互斥锁。它允许多个异步任务安全地访问共享资源。例如:

c#
private readonly AsyncLock _lock = new();

public async Task DoJobAsync()
{
	using (await _lock.LockAsync())
	{
		
	}
}

AsyncLock 结束了 using 的语法糖会自动释放锁,并没有实际释放什么资源。此外,AsyncLock 还可以接受一个 CancellationToken,允许在等待锁的过程中取消操作。

AsyncLock 还保留一个同步的方法:

c#
using (_lock.Lock()) {}

AsyncAutoResetEvent

Nito.AsyncExMicrosoft.VisualStudio.Threading 都提供了 AsyncAutoResetEvent。如果我们不希望使用功能过于强大的 SemaphoreSlim ,我们只希望有一个线程可以访问共享资源,可以使用 AsyncAutoResetEvent。例如:

c#
// 创建一个异步自动重置事件,默认未设置信号
var signal = new AsyncAutoResetEvent(false);

var setter = Task.Run((() =>
{
	Thread.Sleep(1000);
	// 设置信号,允许等待的任务继续执行
	signal.Set();
}));

var waiter = Task.Run(async () =>
{
	// 等待信号
	await signal.WaitAsync();
	Console.WriteLine("Signal received!");
});

await Task.WhenAll(setter, waiter);

简单来说,AsyncAutoResetEvent 中的 "AutoReset"(自动重置) 指的是:门默认是关闭,当 event.Set() 被调用的时候门会打开,排在门前的第一个任务(await event.WaitAsync())会被允许通过。当第一个任务通过后,门会自动关闭,等待下一个任务。

原生的 TaskCompletionSource

AsyncAutoResetEvent 可以被重复利用,但有时我们需要一个一次性的信号,此时可以考虑使用 TaskCompletionSourceTaskCompletionSource 可以用来创建一个可以手动设置结果的任务,也有泛型版本。例如:

c#
var tcs = new TaskCompletionSource();

var setter = Task.Run((() =>
{
	Thread.Sleep(1000);
	// 设置结果,允许等待的任务继续执行
	tcs.SetResult();
}));

var waiter = Task.Run(async () =>
{
	// 等待任务完成
	await tcs.Task;
	Console.WriteLine("Signal received!");
});

await Task.WhenAll(setter, waiter);

如果有多个地方能够设置结果,继续使用 cs.SetResult() 会抛出异常。可以使用 TrySetResult() 来避免异常。例如:

c#
if (!tcs.TrySetResult())
{
	Console.WriteLine("Task was already completed.");
}

传统的 await

在异步编程中,传统的 await 关键字也可以用来实现同步机制。它会暂停当前方法的执行,直到被等待的任务完成。这种方式虽然简单,但并不推荐用于复杂的同步场景,因为它可能导致性能问题和死锁。

c#
var setter = Task.Run((() =>
{
	Thread.Sleep(1000);
}));

var waiter = Task.Run(async () =>
{
	// 等待任务完成
	await setter;
	Console.WriteLine("Signal received!");
});

在传统的多线程编程中,我们很难观察到 Thread 的状态,但在异步编程中,我们可以通过 Task 的状态来观察任务的执行情况。这也是异步编程的一个重要优势,这个同步技巧的核心。

TIP

由于我们可以直接通过 Task 的状态来观察任务的状态,.NET 原生只提供了 SemaphoreSlimTaskCompletionSource

如果想用工具包的话,建议在你希望锁能 AutoRest 的情况并且只有开关两种状态(AsyncAutoResetEvent),或者希望一把一个锁同时用在同步和异步方法里(AsyncLock)这两种需求下使用。否则平时原生的方法很多时候已经足够使用。

异步任务的通信 - Channel 集合

在多线程中的阻塞集合(BlockingCollection)不可以使用,因为它们是阻塞的,违背了异步编程不阻塞的原则。

在 Channel 出现之前,要实现生产者-消费者模式,开发者通常需要手动组合一些同步原语,比如:

  • ConcurrentQueue 作为缓冲区。
  • SemaphoreSlim 来限制队列大小(实现“背压”)和在队列为空时通知消费者等待。
  • 用大量的 lock 和复杂的逻辑来确保线程安全。 这样的代码不仅复杂、容易出错,而且性能调优也很困难。

Channel 的出现就是为了解决这个问题,它提供了:

  • 简洁的 API:将所有复杂的同步逻辑都封装好了。
  • 异步优先:专为 async/await 设计,不会阻塞线程,而是异步地等待,资源利用率极高。
  • 高性能:底层实现经过了高度优化。

可以把 Channel 当做一个自带信号量的 ConcurrentQueue

获取 Chanel 实例

通过 Channel 的工厂方法获得一个无边界的 Channel 实例:

c#
var chanel = Channel.CreateUnbounded<int>();

也可以以同样的方式获得一个有边界的 Channel 实例,需要传递一个容量参数:

c#
// 容量为10
var chanel = Channel.CreateBounded<int>(10);

自定义

通过工厂方法创建的 Channel 可以接受一个 Option 参数,可以进行更多设置,BoundedChannelOptions 相比 UnboundedChannelOptions 提供了更多的配置选项。例如:

c#
var option = new BoundedChannelOptions(10)
{
	FullMode  = BoundedChannelFullMode.Wait,
	SingleReader = true,
	SingleWriter = true
};

配置选项:

  1. Capacity:容量。
  2. FullMode:枚举,表示当 Channel 满时的行为。
    • Wait:等待直到有空间可用。
    • DropWrite:丢弃写入操作。
    • DropOldest:丢弃最旧的元素。
    • DropNewest:丢弃最新的元素。
  3. SingleReader:布尔值,表示是否只允许一个读取器。
  4. SingleWriter:布尔值,表示是否只允许一个写入器。
  5. AllowSynchronousContinuations:一般不常用,只有在边界情况,优化到无可优化的时候可以考虑,存在难以预料的风险。当 Channel 为空时,消费者可以直接尝试从生产者获取元素。

写入与读取

Chanel 内部维护了一个 ConcurrentQueue,可以通过 WriterReader 进行写入和读取。

以下介绍一个生产者-消费者模式(Producer-Consumer Pattern)的示例。在这个示例中,生产者通过 writer.WriteAsync() 方法将数据写入 Channel,而消费者通过 reader.ReadAsync() 方法从 Channel 中读取数据:

c#
var chanel = Channel.CreateUnbounded<int>();

// 启动生产者和消费者
var sender = SendAsync(1, chanel.Writer);
var receiver = ReceiveAsync(2, chanel.Reader);

// 等待发送任务完成
await sender;
// 标记 Channel 写入完成,不再接收新数据
chanel.Writer.Complete();
Console.WriteLine("Channel writer completed.");

// 等待接收任务完成
await receiver;




// 生产者:发送数据到 Channel
async Task SendAsync(int id, ChannelWriter<int> writer)
{
	try
	{
		for (int i = 1; i < 21; i++)
		{
			// 异步写入数据到 Channel
			await writer.WriteAsync(i);
			Console.WriteLine($"Thread {id} sent {i}");
			await Task.Delay(20);
		}
	}
	catch (OperationCanceledException)
	{
		Console.WriteLine($"Thread {id} was canceled.");
	}
}

// 消费者:从 Channel 读取数据
async Task ReceiveAsync(int id, ChannelReader<int> reader)
{
	try
	{
		// 持续读取直到 Channel 完成
		while (!reader.Completion.IsCompleted)
		{
			// 异步读取数据
			int result = await reader.ReadAsync();
			Console.WriteLine($"Thread {id} received {result}");
			await Task.Delay(20);
		}
		Console.WriteLine($"Thread {id} completed reading.");
	}
	catch (ChannelClosedException)
	{
		// Channel 被关闭时抛出此异常
		Console.WriteLine($"Thread {id} was canceled. Because the channel was closed.");
	}
}

writer 完成写入后,调用 chanel.Writer.Complete() 来标记 Channel 不再接收新数据。当 writer 关闭,Channel 为空的时候,Channel 就会自动关闭。如果此时再尝试读取数据,就会抛出 ChannelClosedException

INFO

即使在 while 循环里检查了 reader 的关闭,依旧需要在读取数据时使用 try-catch 来捕获 ChannelClosedException 异常。这是因为 while 里对 reader 的检查和 writer 的关闭可能会刚好错开,导致在 ReadAsync()Channel 已经被关闭。

C# 8.0 带来了一个新的方法:reader.reader.ReadAllAsync,可以代替以前的 reader.ReadAsync()

这个方法会返回一个 IAsyncEnumerable。想要遍历实现了这个接口的类,需要在 foreach 先加上 await 关键词,例如:

c#
async Task ReceiveAsync(int id, ChannelReader<int> reader)
{
	await foreach (var num in reader.ReadAllAsync())
	{
		Console.WriteLine($"Thread {id} received {num}");
	}
}

通过这种方法甚至不需要产生去捕获异常。 其源代码也很简洁优雅。

ReadAllAsync() 源码
c#
public virtual async IAsyncEnumerable<T> ReadAllAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
	while (await WaitToReadAsync(cancellationToken).ConfigureAwait(false))
	{
		while (TryRead(out T? item))
		{
			yield return item;
		}
	}
}

在同步方法中调用异步方法

总的原则是:“异步应该贯穿到底”(Async all the way down)。然而,在现实世界中,我们总会遇到一些无法遵循这个理想原则的情况。常见的情形有:

  1. 在构造函数里异步
  2. 接口规定的方法是同步的
  3. 兼容旧代码或第三方库

阻塞

阻塞的方法很简单,但可能导致死锁。

c#
FooAsync().Wait(); 
var message = GetMessageAsync().result;

使用 GetAwaiter().GetResult() 相比上面两种方法更安全一些,因为它不会抛出 AggregateException,而是直接抛出原始异常。

c#
FooAsync().GetAwaiter().GetResult();

TIP

如果一定要在同步方法中阻塞地调用异步方法,GetAwaiter().GetResult() 应该是最优的方案了。

一发即忘

通过一发即忘的方式来调用异步方法,会导致无法捕获异常。之前已经介绍过一发即忘(Fire-and-Forget),这里不再赘述。

既然阻塞的方式不行,那也只能通过一发即忘的方式来调用异步方法。接下来介绍的方案会基于一发即忘,并尝试解决其缺陷。

扩展方法

这个方案来自 Brian Lagunas,也是 Prism 框架的负责人。

假设这么一个场景,我们需要在一个类的构造函数里去调用异步方法,如果此时用到阻塞的方法就不太合适,这会导致每次类的实例化都要卡几秒钟。

我们需要一种机制,让我们可以在一发即忘的情况下,得知异步方法的执行结果与是否发生异常。为此,我们可以设计一个方法 SafeFireAndForget, 它接受一个异步任务和两个回调函数:一个用于任务完成时的处理,另一个用于异常处理。

c#
class Program
{
	public static void Main(string[] args)
	{
		// 创建 DataModel 实例,构造函数会触发异步数据加载
		var dataModel = new DataModel();
		Console.WriteLine("Data loading started...");
		
		// 模拟主线程继续执行其他任务
		Thread.Sleep(2000);
		
		// 检查数据是否已加载完成
		var data = dataModel.Data;
		Console.WriteLine($"Data loaded: {data?.Count ?? 0} items");
	}
}

class DataModel
{
	public List<int>? Data {get; private set;}
	public bool IsDataLoaded { get; private set; }
	
	public DataModel()
	{
		// 在构造函数中使用 SafeFireAndForget 启动异步数据加载
		// 传入完成回调和错误处理回调
		SafeFireAndForget(
			LoadDataAsync(), 
			() => IsDataLoaded = true, 
			e => Console.WriteLine("Error loading data: " + e.Message)
		);
	}
	
	/// <summary>
	/// 安全的一发即忘方法:执行异步任务但不等待结果,同时提供异常处理
	/// </summary>
	/// <param name="task">要执行的异步任务</param>
	/// <param name="onCompleted">任务完成时的回调</param>
	/// <param name="onError">发生异常时的回调</param>
	static async void SafeFireAndForget(Task task, Action? onCompleted = null, Action<Exception>? onError = null)
	{
		try
		{
			// 等待任务完成
			await task;
			// 任务成功完成,调用完成回调
			onCompleted?.Invoke();
		}
		catch (Exception e)
		{
			// 捕获异常并调用错误处理回调
			onError?.Invoke(e);
		}
	}

	/// <summary>
	/// 模拟异步加载数据的方法
	/// </summary>
	private async Task LoadDataAsync()
	{
		await Task.Delay(1000);
		Data = Enumerable.Range(1, 10).ToList();
	}
}

通过给 Task 添加扩展方法,我们能更优雅的实现和这个功能:

c#
class DataModel
{
	public List<int>? Data {get; private set;}
	public bool IsDataLoaded { get; private set; }
	
	public DataModel()
	{
		// 使用扩展方法启动异步数据加载,提供完成和错误回调
		LoadDataAsync().Await(() => IsDataLoaded = true, 
			e => Console.WriteLine("Error loading data: " + e.Message));
	}
	
	private async Task LoadDataAsync()
	{
		await Task.Delay(1000);
		Console.WriteLine(Environment.CurrentManagedThreadId);
		throw new Exception("Failed to load data");
		Data = Enumerable.Range(1, 10).ToList();
	}
}

/// <summary> 
/// Task 扩展方法类,提供一发即忘功能
/// </summary>
public static class TaskExtensions
{
	/// <summary>
	/// 异步等待任务完成的扩展方法,支持完成和错误回调
	/// 方法名也可叫做 Forget,都是业界约定俗成的名称
	/// </summary>
	/// <param name="task">要执行的异步任务</param>
	/// <param name="onCompleted">任务成功完成时的回调</param>
	/// <param name="onError">任务发生异常时的回调</param>
	public static async void Await(this Task task, Action? onCompleted = null, Action<Exception>? onError = null)
	{
		try
		{
			await task;
			onCompleted?.Invoke();
		}
		catch (Exception e)
		{
			onError?.Invoke(e);
		}
	}
}

ContinueWith()

接下来的方案来自 SingletonSean,这种方法使用了原生的 Task.ContinueWith 方法来处理异步任务的结果和异常:

c#
class DataModel
{
	public List<int>? Data {get; private set;}
	public bool IsDataLoaded { get; private set; }
	
	public DataModel()
	{
		LoadDataAsync().ContinueWith(OnDataLoaded);
	}

	private bool OnDataLoaded(Task task)
	{
		if (task.IsFaulted)
		{
			Console.WriteLine("Error loading data: " + task.Exception?.GetBaseException().Message);
			return false;
		}
		return IsDataLoaded = true;
	}
	
	private async Task LoadDataAsync()
	{
		await Task.Delay(1000);
		Data = Enumerable.Range(1, 10).ToList();
	}
}

WARNING

这种方法明显更加简洁,且使用原生方法避免了自己造轮子。但这并不意味着这种方法优于 Brian 的方案。

  1. ContinueWith 会将你的委托包装成一个 Task,这会导致一些额外的开销。
  2. ContinueWithTaskScheduler 的是 TaskScheduler.Current 而不是 TaskScheduler.Default。这意味着如果你在 UI 线程上调用 ContinueWith,它会在 UI 线程上执行回调,这可能导致死锁或其他问题。

异步工厂

最后的方案来自 Nick Chapsas

c#
class MyService
{
	private MyService()
	{
		
	}

	private async Task InitData()
	{
		await Task.Delay(1000);
	}
	
	public static async Task<MyService> CreateAsync()
	{
		var service = new MyService(); 
		await service.InitData();
		return service;
	}
}

这个方法也非常简单,但也存在一些弊端:

  • 难以实现单例。
  • 没有办法把这个类注册给一个 IOC 容器。

补充

推荐视频:C#如何在一个同步方法中调用异步方法

  1. 如果想实现单例 - AsyncLazy:来自 Nito.AsyncEx 工具包和 Microsoft.VisualStudio.Threading 工具包。
  2. JoinableTaskFactory:来自 Microsoft.VisualStudio.Threading 工具包。
  3. 注册 IOC 容器 - Microsoft.Extensions.DependencyInjection 工具包。
  4. Unit Test

INFO

就结论来看,最好还是能异步就异步,能贯穿到底就贯穿到底。