🚀 用 Channel 替代 BlockingCollection —— .NET 6+ 现代并发数据流
更轻量、更高效、更函数式、更现代化的生产者-消费者模型
📌 为什么需要 Channel?
虽然 BlockingCollection<T> 功能强大,但在 .NET 6+ 时代,微软推荐使用 System.Threading.Channels 中的 Channel<T> 来构建高性能、异步优先的生产者-消费者管道。
❗ BlockingCollection<T> 的局限:
- 基于
Task + 阻塞 API(如 Take()),在高并发下可能阻塞线程
- 不原生支持
async/await 流式消费
- 无法与
IAsyncEnumerable<T> 无缝集成
- 无背压(backpressure)控制(除非手动配合信号量)
✅ Channel<T> 的优势:
- ✔️ 原生异步支持(
ReadAsync / WaitToReadAsync)
- ✔️ 支持
IAsyncEnumerable<T>(channel.Reader.ReadAllAsync())
- ✔️ 更细粒度的读写分离(
ChannelReader<T> / ChannelWriter<T>)
- ✔️ 内置背压支持(Bounded Channel 自动阻塞写入)
- ✔️ 零分配优化、高性能、低延迟
- ✔️ 与现代 .NET 生态(如 ASP.NET Core、Minimal API、BackgroundService)完美集成
🧩 一、Channel 基础用法
1. 创建 Channel
1
2
3
4
5
6
7
8
|
// 无界通道(类似 BlockingCollection 无容量限制)
var channel = Channel.CreateUnbounded<int>();
// 有界通道(容量=10,写满自动阻塞/丢弃/覆盖 — 可配置)
var boundedChannel = Channel.CreateBounded<int>(new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait // 默认行为:写满时等待
});
|
2. 生产者写入
1
2
3
4
5
6
|
var writer = channel.Writer;
await writer.WriteAsync(1);
await writer.WriteAsync(2);
writer.Complete(); // 标记完成写入
|
3. 消费者读取
1
2
3
4
5
6
7
8
9
|
var reader = channel.Reader;
while (await reader.WaitToReadAsync())
{
while (reader.TryRead(out var item))
{
Console.WriteLine($"消费: {item}");
}
}
|
或使用 IAsyncEnumerable<T>:
1
2
3
4
|
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消费: {item}");
}
|
✅ 完整示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
using System.Threading.Channels;
var channel = Channel.CreateBounded<int>(10);
// 生产者
_ = Task.Run(async () =>
{
for (int i = 1; i <= 20; i++)
{
await channel.Writer.WriteAsync(i);
Console.WriteLine($"生产: {i}");
await Task.Delay(100);
}
channel.Writer.Complete();
});
// 消费者
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"消费: {item}");
await Task.Delay(200);
}
|
🔄 二、Channel vs BlockingCollection 对比
| 特性 |
BlockingCollection<T> |
Channel<T> |
| 异步支持 |
❌(需包装 Task.Run) |
✅ 原生异步 |
| 流式消费 |
❌(foreach 阻塞) |
✅ IAsyncEnumerable<T> |
| 背压控制 |
⚠️ 需手动配合信号量 |
✅ 内置(BoundedChannel) |
| 性能 |
⚠️ 中等(有锁开销) |
✅ 更高(优化无锁结构) |
| 读写分离 |
❌ |
✅ ChannelReader/Writer |
| 取消支持 |
⚠️ 有限 |
✅ 原生支持 CancellationToken |
| .NET 现代生态集成 |
⚠️ 旧式 |
✅ 推荐(.NET 6+) |
🎯 三、实战:用 Channel 改写任务调度器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
public class ChannelTaskScheduler<T>
{
private readonly Channel<Func<Task<T>>> _channel;
private readonly SemaphoreSlim _semaphore;
private readonly ILogger _logger;
public ChannelTaskScheduler(int maxConcurrency, int? boundedCapacity = null, ILogger logger = null)
{
_channel = boundedCapacity.HasValue
? Channel.CreateBounded<Func<Task<T>>>(boundedCapacity.Value)
: Channel.CreateUnbounded<Func<Task<T>>>();
_semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
_logger = logger;
}
public async ValueTask<bool> QueueTaskAsync(Func<Task<T>> task, CancellationToken ct = default)
{
return await _channel.Writer.WriteAsync(task, ct);
}
public async Task StartAsync(CancellationToken ct = default)
{
await foreach (var workItem in _channel.Reader.ReadAllAsync(ct))
{
_ = ExecuteWithLimit(workItem, ct); // 并发执行,不等待
}
}
private async Task ExecuteWithLimit(Func<Task<T>> workItem, CancellationToken ct)
{
await _semaphore.WaitAsync(ct);
try
{
await workItem();
}
catch (Exception ex)
{
_logger?.LogError(ex, "任务执行异常");
}
finally
{
_semaphore.Release();
}
}
public async Task CompleteAsync(CancellationToken ct = default)
{
_channel.Writer.Complete();
// 可选:等待所有任务完成
// await foreach (var _ in _channel.Reader.ReadAllAsync(ct)) { }
}
}
|
使用示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
var scheduler = new ChannelTaskScheduler<string>(maxConcurrency: 3, boundedCapacity: 10, logger);
for (int i = 1; i <= 20; i++)
{
var taskId = i;
await scheduler.QueueTaskAsync(() => Task.Run(async () =>
{
await Task.Delay(500);
return $"任务 {taskId} 完成";
}));
}
await scheduler.StartAsync();
|
⚙️ 四、高级特性
1. 背压策略(BoundedChannelFullMode)
1
2
3
4
5
6
7
8
9
10
|
var options = new BoundedChannelOptions(10)
{
FullMode = BoundedChannelFullMode.Wait, // 默认:写满等待
// FullMode = BoundedChannelFullMode.DropWrite, // 丢弃最新写入
// FullMode = BoundedChannelFullMode.DropOldest, // 丢弃最旧数据
// FullMode = BoundedChannelFullMode.DropNewest, // 丢弃最新数据
AllowSynchronousContinuations = false, // 避免同步延续(推荐 false)
SingleReader = false, // 是否单消费者
SingleWriter = false // 是否单生产者
};
|
2. 单生产者/单消费者优化
如果你能保证只有一个线程写入或读取,启用 SingleWriter = true 或 SingleReader = true,性能更佳!
3. 取消支持
1
2
3
4
5
6
|
CancellationTokenSource cts = new(TimeSpan.FromSeconds(5));
await foreach (var item in channel.Reader.ReadAllAsync(cts.Token))
{
// ...
}
|
📝 五、最佳实践
| 场景 |
推荐方案 |
| 新项目 / .NET 6+ |
✅ 优先使用 Channel<T> |
| 需要异步流 / IAsyncEnumerable |
✅ Channel<T> |
| 需要背压控制 |
✅ BoundedChannel<T> |
| 遗留系统兼容 |
⚠️ 可继续使用 BlockingCollection<T> |
| 高性能低延迟场景 |
✅ Channel<T> + SingleWriter/Reader |
✅ 结语
Channel<T> 是 .NET 现代并发编程的“标准答案”,它:
- 更符合异步编程模型
- 性能更优
- 功能更丰富
- 生态更现代
是时候用 Channel<T> 替代 BlockingCollection<T> 了!