15 minlesson

System.Threading.Channels

System.Threading.Channels

Channel<T> is the modern, async-first implementation of the producer/consumer pattern. It provides excellent performance and full async/await support.

Why Channels?

BlockingCollectionChannel
Blocking APIAsync API
Add() blocks threadWriteAsync() releases thread
Take() blocks threadReadAsync() releases thread
.NET Framework era.NET Core/5+ era

Creating Channels

csharp
1using System.Threading.Channels;
2
3// Unbounded channel (no limit)
4Channel<int> unbounded = Channel.CreateUnbounded<int>();
5
6// Bounded channel (with limit)
7Channel<int> bounded = Channel.CreateBounded<int>(100);

Channel Structure

A channel has two ends:

csharp
1Channel<string> channel = Channel.CreateUnbounded<string>();
2
3// Writer end - for producers
4ChannelWriter<string> writer = channel.Writer;
5
6// Reader end - for consumers
7ChannelReader<string> reader = channel.Reader;

This separation allows passing only the relevant end to each component.

Basic Usage

csharp
1var channel = Channel.CreateUnbounded<int>();
2
3// Producer
4var producer = Task.Run(async () =>
5{
6 for (int i = 0; i < 10; i++)
7 {
8 await channel.Writer.WriteAsync(i);
9 Console.WriteLine($"Wrote: {i}");
10 }
11 channel.Writer.Complete();
12});
13
14// Consumer
15var consumer = Task.Run(async () =>
16{
17 await foreach (var item in channel.Reader.ReadAllAsync())
18 {
19 Console.WriteLine($"Read: {item}");
20 }
21});
22
23await Task.WhenAll(producer, consumer);

Writing to Channels

csharp
1ChannelWriter<int> writer = channel.Writer;
2
3// WriteAsync - async write, waits if bounded and full
4await writer.WriteAsync(42);
5
6// TryWrite - non-blocking, returns false if can't write
7if (writer.TryWrite(42))
8{
9 Console.WriteLine("Written");
10}
11
12// Complete - signal no more writes
13writer.Complete();
14
15// Complete with error
16writer.Complete(new Exception("Producer failed"));

Reading from Channels

csharp
1ChannelReader<int> reader = channel.Reader;
2
3// ReadAsync - async read, waits for item
4int item = await reader.ReadAsync();
5
6// TryRead - non-blocking
7if (reader.TryRead(out var item))
8{
9 Process(item);
10}
11
12// WaitToReadAsync - wait for availability
13while (await reader.WaitToReadAsync())
14{
15 while (reader.TryRead(out var item))
16 {
17 Process(item);
18 }
19}
20
21// ReadAllAsync - IAsyncEnumerable (preferred)
22await foreach (var item in reader.ReadAllAsync())
23{
24 Process(item);
25}

ReadAllAsync - The Preferred Pattern

csharp
1// Clean, modern syntax
2await foreach (var item in channel.Reader.ReadAllAsync())
3{
4 await ProcessAsync(item);
5}
6
7// With cancellation
8await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))
9{
10 await ProcessAsync(item);
11}

Channel Options

Unbounded Options

csharp
1var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions
2{
3 SingleWriter = true, // Optimize for single producer
4 SingleReader = true, // Optimize for single consumer
5 AllowSynchronousContinuations = false // Prevent stack overflow
6});

Bounded Options

csharp
1var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
2{
3 FullMode = BoundedChannelFullMode.Wait, // Default: wait when full
4 SingleWriter = false,
5 SingleReader = false,
6 AllowSynchronousContinuations = false
7});

Complete Example

csharp
1class MessageProcessor
2{
3 private readonly Channel<Message> _channel;
4 private readonly Task _processingTask;
5
6 public MessageProcessor()
7 {
8 _channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(1000)
9 {
10 FullMode = BoundedChannelFullMode.Wait
11 });
12
13 _processingTask = Task.Run(ProcessMessagesAsync);
14 }
15
16 public async ValueTask EnqueueAsync(Message message, CancellationToken ct = default)
17 {
18 await _channel.Writer.WriteAsync(message, ct);
19 }
20
21 public void Complete() => _channel.Writer.Complete();
22
23 public Task Completion => _processingTask;
24
25 private async Task ProcessMessagesAsync()
26 {
27 await foreach (var message in _channel.Reader.ReadAllAsync())
28 {
29 try
30 {
31 await HandleMessageAsync(message);
32 }
33 catch (Exception ex)
34 {
35 Console.WriteLine($"Error processing message: {ex.Message}");
36 }
37 }
38 }
39
40 private async Task HandleMessageAsync(Message message)
41 {
42 await Task.Delay(10); // Simulate processing
43 Console.WriteLine($"Processed: {message.Content}");
44 }
45}
46
47record Message(string Content);
48
49// Usage
50var processor = new MessageProcessor();
51
52// Producer
53for (int i = 0; i < 100; i++)
54{
55 await processor.EnqueueAsync(new Message($"Message {i}"));
56}
57
58processor.Complete();
59await processor.Completion;

Multiple Consumers

csharp
1var channel = Channel.CreateUnbounded<int>();
2
3// Start multiple consumers
4var consumers = Enumerable.Range(0, 3)
5 .Select(id => ConsumeAsync(id, channel.Reader))
6 .ToArray();
7
8// Producer
9for (int i = 0; i < 100; i++)
10{
11 await channel.Writer.WriteAsync(i);
12}
13channel.Writer.Complete();
14
15await Task.WhenAll(consumers);
16
17async Task ConsumeAsync(int id, ChannelReader<int> reader)
18{
19 await foreach (var item in reader.ReadAllAsync())
20 {
21 Console.WriteLine($"Consumer {id}: {item}");
22 await Task.Delay(10);
23 }
24}

Error Propagation

csharp
1// Producer can signal error
2try
3{
4 await ProduceAsync(channel.Writer);
5 channel.Writer.Complete();
6}
7catch (Exception ex)
8{
9 channel.Writer.Complete(ex); // Error propagates to readers
10}
11
12// Consumer sees the error
13try
14{
15 await foreach (var item in channel.Reader.ReadAllAsync())
16 {
17 Process(item);
18 }
19}
20catch (ChannelClosedException ex)
21{
22 Console.WriteLine($"Channel closed with error: {ex.InnerException?.Message}");
23}

Key Takeaways

  • Channel<T> is the modern, async-first producer/consumer
  • Separates ChannelWriter<T> and ChannelReader<T>
  • Use ReadAllAsync() with await foreach for clean consumption
  • Complete() signals no more items
  • Configure with SingleWriter/SingleReader for better performance
  • Full async support - no thread blocking
  • Prefer Channels over BlockingCollection for async code
System.Threading.Channels - Anko Academy