Advanced Channel Patterns
This lesson covers bounded channels, backpressure strategies, and advanced patterns for production use.
Bounded Channel Full Modes
When a bounded channel is full, you can choose different behaviors:
csharp1var options = new BoundedChannelOptions(capacity: 10)2{3 FullMode = BoundedChannelFullMode.Wait // Default4};
Wait (Default)
csharp1FullMode = BoundedChannelFullMode.Wait
WriteAsync blocks until space is available. This is backpressure.
csharp1// Producer slows down to match consumer speed2await writer.WriteAsync(item); // Waits if full
DropNewest
csharp1FullMode = BoundedChannelFullMode.DropNewest
When full, the newest item (being written) is dropped.
csharp1// Item may be silently dropped2writer.TryWrite(item); // Returns false if dropped
DropOldest
csharp1FullMode = BoundedChannelFullMode.DropOldest
When full, the oldest item in the queue is dropped to make room.
csharp1// Oldest items sacrificed for newest2// Good for real-time data where latest matters most
DropWrite
csharp1FullMode = BoundedChannelFullMode.DropWrite
Same as DropNewest - incoming write is dropped if full.
Choosing the Right Mode
| Mode | Use Case |
|---|---|
| Wait | Critical data, can't lose items |
| DropNewest | Non-critical, latest data matters |
| DropOldest | Real-time feeds, want freshest data |
Backpressure in Action
csharp1var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(5)2{3 FullMode = BoundedChannelFullMode.Wait4});56// Fast producer7var producer = Task.Run(async () =>8{9 for (int i = 0; i < 20; i++)10 {11 Console.WriteLine($"Writing {i}...");12 await channel.Writer.WriteAsync(i); // Waits when full13 Console.WriteLine($"Wrote {i}");14 }15 channel.Writer.Complete();16});1718// Slow consumer19var consumer = Task.Run(async () =>20{21 await foreach (var item in channel.Reader.ReadAllAsync())22 {23 Console.WriteLine($"Processing {item}");24 await Task.Delay(500); // Slow processing25 }26});2728// Output shows producer waiting for consumer
TryWrite for Non-Blocking
Check if write succeeds without blocking:
csharp1var channel = Channel.CreateBounded<LogEntry>(1000);23public void Log(string message)4{5 var entry = new LogEntry(message);67 if (!channel.Writer.TryWrite(entry))8 {9 // Queue full - handle overflow10 Console.WriteLine("Log queue full, dropping message");11 // Or: write to overflow file12 // Or: increment dropped counter13 }14}
WaitToWriteAsync Pattern
Check for availability before writing:
csharp1while (await channel.Writer.WaitToWriteAsync(cancellationToken))2{3 var item = await GetNextItemAsync();45 if (!channel.Writer.TryWrite(item))6 {7 // Rare: space became unavailable8 continue;9 }10}
Multiple Writers with Coordination
csharp1var channel = Channel.CreateBounded<WorkItem>(100);2var writersComplete = 0;3var totalWriters = 3;45// Multiple writers6for (int i = 0; i < totalWriters; i++)7{8 int writerId = i;9 _ = Task.Run(async () =>10 {11 for (int j = 0; j < 100; j++)12 {13 await channel.Writer.WriteAsync(new WorkItem(writerId, j));14 }1516 // Only complete when ALL writers done17 if (Interlocked.Increment(ref writersComplete) == totalWriters)18 {19 channel.Writer.Complete();20 }21 });22}
Timeout Patterns
csharp1// Write with timeout2using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));3try4{5 await channel.Writer.WriteAsync(item, cts.Token);6}7catch (OperationCanceledException)8{9 Console.WriteLine("Write timed out - queue full too long");10}1112// Read with timeout13using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));14try15{16 var item = await channel.Reader.ReadAsync(cts.Token);17}18catch (OperationCanceledException)19{20 Console.WriteLine("Read timed out - no items available");21}
Graceful Shutdown
csharp1class WorkerService : IAsyncDisposable2{3 private readonly Channel<WorkItem> _channel;4 private readonly Task _processingTask;5 private readonly CancellationTokenSource _shutdownCts;67 public WorkerService()8 {9 _channel = Channel.CreateBounded<WorkItem>(1000);10 _shutdownCts = new CancellationTokenSource();11 _processingTask = Task.Run(ProcessAsync);12 }1314 public async ValueTask EnqueueAsync(WorkItem item)15 {16 await _channel.Writer.WriteAsync(item);17 }1819 private async Task ProcessAsync()20 {21 try22 {23 await foreach (var item in _channel.Reader.ReadAllAsync(_shutdownCts.Token))24 {25 await ProcessItemAsync(item);26 }27 }28 catch (OperationCanceledException)29 {30 // Shutdown requested - drain remaining items31 while (_channel.Reader.TryRead(out var item))32 {33 await ProcessItemAsync(item);34 }35 }36 }3738 public async ValueTask DisposeAsync()39 {40 _channel.Writer.Complete();4142 // Wait for processing to complete (with timeout)43 var completionTask = _processingTask;44 var timeoutTask = Task.Delay(TimeSpan.FromSeconds(30));4546 if (await Task.WhenAny(completionTask, timeoutTask) == timeoutTask)47 {48 // Timeout - cancel and drain49 _shutdownCts.Cancel();50 await completionTask;51 }5253 _shutdownCts.Dispose();54 }55}
Channel as Async Stream Source
csharp1public class EventSource2{3 private readonly Channel<Event> _channel = Channel.CreateUnbounded<Event>();45 public void RaiseEvent(Event evt) => _channel.Writer.TryWrite(evt);67 public IAsyncEnumerable<Event> GetEventsAsync(CancellationToken ct = default)8 {9 return _channel.Reader.ReadAllAsync(ct);10 }11}1213// Consumer14await foreach (var evt in eventSource.GetEventsAsync(cancellationToken))15{16 await HandleEventAsync(evt);17}
Performance Tips
1. Use SingleWriter/SingleReader When Applicable
csharp1var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(100)2{3 SingleWriter = true, // Only one producer4 SingleReader = true // Only one consumer5});
2. Avoid AllowSynchronousContinuations
csharp1// Keep false to prevent stack overflow in deep chains2AllowSynchronousContinuations = false // Default
3. Size Bounded Channels Appropriately
csharp1// Too small: excessive blocking2// Too large: memory waste3// Balance based on production/consumption rates4var channel = Channel.CreateBounded<int>(5 Math.Max(100, expectedBurstSize * 2)6);
Key Takeaways
- Bounded channels provide backpressure (FullMode.Wait)
- DropNewest/DropOldest for non-critical data
- Use TryWrite for non-blocking overflow handling
- Coordinate multiple writers for proper completion
- Implement graceful shutdown with drain logic
- Configure SingleWriter/SingleReader for performance
- Channels integrate well with IAsyncEnumerable