Featured image of post Parallel Programing Modern

Parallel Programing Modern

虽然 `BlockingCollection<T>` 功能强大,但在 .NET 6+ 时代,微软推荐使用 **`System.Threading.Channels`** 中的 `Channel<T>` 来构建高性能、异步优先的生产者-消费者管道。

🚀 用 Channel 替代 BlockingCollection —— .NET 6+ 现代并发数据流

更轻量、更高效、更函数式、更现代化的生产者-消费者模型


📌 为什么需要 Channel

虽然 BlockingCollection<T> 功能强大,但在 .NET 6+ 时代,微软推荐使用 System.Threading.Channels 中的 Channel<T> 来构建高性能、异步优先的生产者-消费者管道。

BlockingCollection<T> 的局限:

  • 基于 Task + 阻塞 API(如 Take()),在高并发下可能阻塞线程
  • 不原生支持 async/await 流式消费
  • 无法与 IAsyncEnumerable<T> 无缝集成
  • 无背压(backpressure)控制(除非手动配合信号量)

Channel<T> 的优势:

  • ✔️ 原生异步支持(ReadAsync / WaitToReadAsync
  • ✔️ 支持 IAsyncEnumerable<T>channel.Reader.ReadAllAsync()
  • ✔️ 更细粒度的读写分离(ChannelReader<T> / ChannelWriter<T>
  • ✔️ 内置背压支持(Bounded Channel 自动阻塞写入)
  • ✔️ 零分配优化、高性能、低延迟
  • ✔️ 与现代 .NET 生态(如 ASP.NET Core、Minimal API、BackgroundService)完美集成

🧩 一、Channel 基础用法

1. 创建 Channel

1
2
3
4
5
6
7
8
// 无界通道(类似 BlockingCollection 无容量限制)
var channel = Channel.CreateUnbounded<int>();

// 有界通道(容量=10,写满自动阻塞/丢弃/覆盖 — 可配置)
var boundedChannel = Channel.CreateBounded<int>(new BoundedChannelOptions(10)
{
    FullMode = BoundedChannelFullMode.Wait // 默认行为:写满时等待
});

2. 生产者写入

1
2
3
4
5
6
var writer = channel.Writer;

await writer.WriteAsync(1);
await writer.WriteAsync(2);

writer.Complete(); // 标记完成写入

3. 消费者读取

1
2
3
4
5
6
7
8
9
var reader = channel.Reader;

while (await reader.WaitToReadAsync())
{
    while (reader.TryRead(out var item))
    {
        Console.WriteLine($"消费: {item}");
    }
}

或使用 IAsyncEnumerable<T>

1
2
3
4
await foreach (var item in channel.Reader.ReadAllAsync())
{
    Console.WriteLine($"消费: {item}");
}

完整示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
using System.Threading.Channels;

var channel = Channel.CreateBounded<int>(10);

// 生产者
_ = Task.Run(async () =>
{
    for (int i = 1; i <= 20; i++)
    {
        await channel.Writer.WriteAsync(i);
        Console.WriteLine($"生产: {i}");
        await Task.Delay(100);
    }
    channel.Writer.Complete();
});

// 消费者
await foreach (var item in channel.Reader.ReadAllAsync())
{
    Console.WriteLine($"消费: {item}");
    await Task.Delay(200);
}

🔄 二、Channel vs BlockingCollection 对比

特性 BlockingCollection<T> Channel<T>
异步支持 ❌(需包装 Task.Run) ✅ 原生异步
流式消费 ❌(foreach 阻塞) IAsyncEnumerable<T>
背压控制 ⚠️ 需手动配合信号量 ✅ 内置(BoundedChannel)
性能 ⚠️ 中等(有锁开销) ✅ 更高(优化无锁结构)
读写分离 ChannelReader/Writer
取消支持 ⚠️ 有限 ✅ 原生支持 CancellationToken
.NET 现代生态集成 ⚠️ 旧式 ✅ 推荐(.NET 6+)

🎯 三、实战:用 Channel 改写任务调度器

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ChannelTaskScheduler<T>
{
    private readonly Channel<Func<Task<T>>> _channel;
    private readonly SemaphoreSlim _semaphore;
    private readonly ILogger _logger;

    public ChannelTaskScheduler(int maxConcurrency, int? boundedCapacity = null, ILogger logger = null)
    {
        _channel = boundedCapacity.HasValue
            ? Channel.CreateBounded<Func<Task<T>>>(boundedCapacity.Value)
            : Channel.CreateUnbounded<Func<Task<T>>>();

        _semaphore = new SemaphoreSlim(maxConcurrency, maxConcurrency);
        _logger = logger;
    }

    public async ValueTask<bool> QueueTaskAsync(Func<Task<T>> task, CancellationToken ct = default)
    {
        return await _channel.Writer.WriteAsync(task, ct);
    }

    public async Task StartAsync(CancellationToken ct = default)
    {
        await foreach (var workItem in _channel.Reader.ReadAllAsync(ct))
        {
            _ = ExecuteWithLimit(workItem, ct); // 并发执行,不等待
        }
    }

    private async Task ExecuteWithLimit(Func<Task<T>> workItem, CancellationToken ct)
    {
        await _semaphore.WaitAsync(ct);
        try
        {
            await workItem();
        }
        catch (Exception ex)
        {
            _logger?.LogError(ex, "任务执行异常");
        }
        finally
        {
            _semaphore.Release();
        }
    }

    public async Task CompleteAsync(CancellationToken ct = default)
    {
        _channel.Writer.Complete();
        // 可选:等待所有任务完成
        // await foreach (var _ in _channel.Reader.ReadAllAsync(ct)) { }
    }
}

使用示例

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
var scheduler = new ChannelTaskScheduler<string>(maxConcurrency: 3, boundedCapacity: 10, logger);

for (int i = 1; i <= 20; i++)
{
    var taskId = i;
    await scheduler.QueueTaskAsync(() => Task.Run(async () =>
    {
        await Task.Delay(500);
        return $"任务 {taskId} 完成";
    }));
}

await scheduler.StartAsync();

⚙️ 四、高级特性

1. 背压策略(BoundedChannelFullMode)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
var options = new BoundedChannelOptions(10)
{
    FullMode = BoundedChannelFullMode.Wait,        // 默认:写满等待
    // FullMode = BoundedChannelFullMode.DropWrite,   // 丢弃最新写入
    // FullMode = BoundedChannelFullMode.DropOldest,  // 丢弃最旧数据
    // FullMode = BoundedChannelFullMode.DropNewest,  // 丢弃最新数据
    AllowSynchronousContinuations = false,         // 避免同步延续(推荐 false)
    SingleReader = false,                          // 是否单消费者
    SingleWriter = false                           // 是否单生产者
};

2. 单生产者/单消费者优化

如果你能保证只有一个线程写入或读取,启用 SingleWriter = trueSingleReader = true,性能更佳!

3. 取消支持

1
2
3
4
5
6
CancellationTokenSource cts = new(TimeSpan.FromSeconds(5));

await foreach (var item in channel.Reader.ReadAllAsync(cts.Token))
{
    // ...
}

📝 五、最佳实践

场景 推荐方案
新项目 / .NET 6+ ✅ 优先使用 Channel<T>
需要异步流 / IAsyncEnumerable Channel<T>
需要背压控制 BoundedChannel<T>
遗留系统兼容 ⚠️ 可继续使用 BlockingCollection<T>
高性能低延迟场景 Channel<T> + SingleWriter/Reader

✅ 结语

Channel<T> 是 .NET 现代并发编程的“标准答案”,它:

  • 更符合异步编程模型
  • 性能更优
  • 功能更丰富
  • 生态更现代

是时候用 Channel<T> 替代 BlockingCollection<T> 了!

Built with Hugo
Theme Stack designed by Jimmy