System.Threading.Channels
Channel<T> is the modern, async-first implementation of the producer/consumer pattern. It provides excellent performance and full async/await support.
Why Channels?
| BlockingCollection | Channel |
|---|---|
| Blocking API | Async API |
Add() blocks thread | WriteAsync() releases thread |
Take() blocks thread | ReadAsync() releases thread |
| .NET Framework era | .NET Core/5+ era |
Creating Channels
csharp1using System.Threading.Channels;23// Unbounded channel (no limit)4Channel<int> unbounded = Channel.CreateUnbounded<int>();56// Bounded channel (with limit)7Channel<int> bounded = Channel.CreateBounded<int>(100);
Channel Structure
A channel has two ends:
csharp1Channel<string> channel = Channel.CreateUnbounded<string>();23// Writer end - for producers4ChannelWriter<string> writer = channel.Writer;56// Reader end - for consumers7ChannelReader<string> reader = channel.Reader;
This separation allows passing only the relevant end to each component.
Basic Usage
csharp1var channel = Channel.CreateUnbounded<int>();23// Producer4var producer = Task.Run(async () =>5{6 for (int i = 0; i < 10; i++)7 {8 await channel.Writer.WriteAsync(i);9 Console.WriteLine($"Wrote: {i}");10 }11 channel.Writer.Complete();12});1314// Consumer15var consumer = Task.Run(async () =>16{17 await foreach (var item in channel.Reader.ReadAllAsync())18 {19 Console.WriteLine($"Read: {item}");20 }21});2223await Task.WhenAll(producer, consumer);
Writing to Channels
csharp1ChannelWriter<int> writer = channel.Writer;23// WriteAsync - async write, waits if bounded and full4await writer.WriteAsync(42);56// TryWrite - non-blocking, returns false if can't write7if (writer.TryWrite(42))8{9 Console.WriteLine("Written");10}1112// Complete - signal no more writes13writer.Complete();1415// Complete with error16writer.Complete(new Exception("Producer failed"));
Reading from Channels
csharp1ChannelReader<int> reader = channel.Reader;23// ReadAsync - async read, waits for item4int item = await reader.ReadAsync();56// TryRead - non-blocking7if (reader.TryRead(out var item))8{9 Process(item);10}1112// WaitToReadAsync - wait for availability13while (await reader.WaitToReadAsync())14{15 while (reader.TryRead(out var item))16 {17 Process(item);18 }19}2021// ReadAllAsync - IAsyncEnumerable (preferred)22await foreach (var item in reader.ReadAllAsync())23{24 Process(item);25}
ReadAllAsync - The Preferred Pattern
csharp1// Clean, modern syntax2await foreach (var item in channel.Reader.ReadAllAsync())3{4 await ProcessAsync(item);5}67// With cancellation8await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken))9{10 await ProcessAsync(item);11}
Channel Options
Unbounded Options
csharp1var channel = Channel.CreateUnbounded<int>(new UnboundedChannelOptions2{3 SingleWriter = true, // Optimize for single producer4 SingleReader = true, // Optimize for single consumer5 AllowSynchronousContinuations = false // Prevent stack overflow6});
Bounded Options
csharp1var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(100)2{3 FullMode = BoundedChannelFullMode.Wait, // Default: wait when full4 SingleWriter = false,5 SingleReader = false,6 AllowSynchronousContinuations = false7});
Complete Example
csharp1class MessageProcessor2{3 private readonly Channel<Message> _channel;4 private readonly Task _processingTask;56 public MessageProcessor()7 {8 _channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(1000)9 {10 FullMode = BoundedChannelFullMode.Wait11 });1213 _processingTask = Task.Run(ProcessMessagesAsync);14 }1516 public async ValueTask EnqueueAsync(Message message, CancellationToken ct = default)17 {18 await _channel.Writer.WriteAsync(message, ct);19 }2021 public void Complete() => _channel.Writer.Complete();2223 public Task Completion => _processingTask;2425 private async Task ProcessMessagesAsync()26 {27 await foreach (var message in _channel.Reader.ReadAllAsync())28 {29 try30 {31 await HandleMessageAsync(message);32 }33 catch (Exception ex)34 {35 Console.WriteLine($"Error processing message: {ex.Message}");36 }37 }38 }3940 private async Task HandleMessageAsync(Message message)41 {42 await Task.Delay(10); // Simulate processing43 Console.WriteLine($"Processed: {message.Content}");44 }45}4647record Message(string Content);4849// Usage50var processor = new MessageProcessor();5152// Producer53for (int i = 0; i < 100; i++)54{55 await processor.EnqueueAsync(new Message($"Message {i}"));56}5758processor.Complete();59await processor.Completion;
Multiple Consumers
csharp1var channel = Channel.CreateUnbounded<int>();23// Start multiple consumers4var consumers = Enumerable.Range(0, 3)5 .Select(id => ConsumeAsync(id, channel.Reader))6 .ToArray();78// Producer9for (int i = 0; i < 100; i++)10{11 await channel.Writer.WriteAsync(i);12}13channel.Writer.Complete();1415await Task.WhenAll(consumers);1617async Task ConsumeAsync(int id, ChannelReader<int> reader)18{19 await foreach (var item in reader.ReadAllAsync())20 {21 Console.WriteLine($"Consumer {id}: {item}");22 await Task.Delay(10);23 }24}
Error Propagation
csharp1// Producer can signal error2try3{4 await ProduceAsync(channel.Writer);5 channel.Writer.Complete();6}7catch (Exception ex)8{9 channel.Writer.Complete(ex); // Error propagates to readers10}1112// Consumer sees the error13try14{15 await foreach (var item in channel.Reader.ReadAllAsync())16 {17 Process(item);18 }19}20catch (ChannelClosedException ex)21{22 Console.WriteLine($"Channel closed with error: {ex.InnerException?.Message}");23}
Key Takeaways
Channel<T>is the modern, async-first producer/consumer- Separates
ChannelWriter<T>andChannelReader<T> - Use
ReadAllAsync()withawait foreachfor clean consumption Complete()signals no more items- Configure with
SingleWriter/SingleReaderfor better performance - Full async support - no thread blocking
- Prefer Channels over BlockingCollection for async code