15 minlesson

Advanced Channel Patterns

Advanced Channel Patterns

This lesson covers bounded channels, backpressure strategies, and advanced patterns for production use.

Bounded Channel Full Modes

When a bounded channel is full, you can choose different behaviors:

csharp
1var options = new BoundedChannelOptions(capacity: 10)
2{
3 FullMode = BoundedChannelFullMode.Wait // Default
4};

Wait (Default)

csharp
1FullMode = BoundedChannelFullMode.Wait

WriteAsync blocks until space is available. This is backpressure.

csharp
1// Producer slows down to match consumer speed
2await writer.WriteAsync(item); // Waits if full

DropNewest

csharp
1FullMode = BoundedChannelFullMode.DropNewest

When full, the newest item (being written) is dropped.

csharp
1// Item may be silently dropped
2writer.TryWrite(item); // Returns false if dropped

DropOldest

csharp
1FullMode = BoundedChannelFullMode.DropOldest

When full, the oldest item in the queue is dropped to make room.

csharp
1// Oldest items sacrificed for newest
2// Good for real-time data where latest matters most

DropWrite

csharp
1FullMode = BoundedChannelFullMode.DropWrite

Same as DropNewest - incoming write is dropped if full.

Choosing the Right Mode

ModeUse Case
WaitCritical data, can't lose items
DropNewestNon-critical, latest data matters
DropOldestReal-time feeds, want freshest data

Backpressure in Action

csharp
1var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(5)
2{
3 FullMode = BoundedChannelFullMode.Wait
4});
5
6// Fast producer
7var producer = Task.Run(async () =>
8{
9 for (int i = 0; i < 20; i++)
10 {
11 Console.WriteLine($"Writing {i}...");
12 await channel.Writer.WriteAsync(i); // Waits when full
13 Console.WriteLine($"Wrote {i}");
14 }
15 channel.Writer.Complete();
16});
17
18// Slow consumer
19var consumer = Task.Run(async () =>
20{
21 await foreach (var item in channel.Reader.ReadAllAsync())
22 {
23 Console.WriteLine($"Processing {item}");
24 await Task.Delay(500); // Slow processing
25 }
26});
27
28// Output shows producer waiting for consumer

TryWrite for Non-Blocking

Check if write succeeds without blocking:

csharp
1var channel = Channel.CreateBounded<LogEntry>(1000);
2
3public void Log(string message)
4{
5 var entry = new LogEntry(message);
6
7 if (!channel.Writer.TryWrite(entry))
8 {
9 // Queue full - handle overflow
10 Console.WriteLine("Log queue full, dropping message");
11 // Or: write to overflow file
12 // Or: increment dropped counter
13 }
14}

WaitToWriteAsync Pattern

Check for availability before writing:

csharp
1while (await channel.Writer.WaitToWriteAsync(cancellationToken))
2{
3 var item = await GetNextItemAsync();
4
5 if (!channel.Writer.TryWrite(item))
6 {
7 // Rare: space became unavailable
8 continue;
9 }
10}

Multiple Writers with Coordination

csharp
1var channel = Channel.CreateBounded<WorkItem>(100);
2var writersComplete = 0;
3var totalWriters = 3;
4
5// Multiple writers
6for (int i = 0; i < totalWriters; i++)
7{
8 int writerId = i;
9 _ = Task.Run(async () =>
10 {
11 for (int j = 0; j < 100; j++)
12 {
13 await channel.Writer.WriteAsync(new WorkItem(writerId, j));
14 }
15
16 // Only complete when ALL writers done
17 if (Interlocked.Increment(ref writersComplete) == totalWriters)
18 {
19 channel.Writer.Complete();
20 }
21 });
22}

Timeout Patterns

csharp
1// Write with timeout
2using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
3try
4{
5 await channel.Writer.WriteAsync(item, cts.Token);
6}
7catch (OperationCanceledException)
8{
9 Console.WriteLine("Write timed out - queue full too long");
10}
11
12// Read with timeout
13using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
14try
15{
16 var item = await channel.Reader.ReadAsync(cts.Token);
17}
18catch (OperationCanceledException)
19{
20 Console.WriteLine("Read timed out - no items available");
21}

Graceful Shutdown

csharp
1class WorkerService : IAsyncDisposable
2{
3 private readonly Channel<WorkItem> _channel;
4 private readonly Task _processingTask;
5 private readonly CancellationTokenSource _shutdownCts;
6
7 public WorkerService()
8 {
9 _channel = Channel.CreateBounded<WorkItem>(1000);
10 _shutdownCts = new CancellationTokenSource();
11 _processingTask = Task.Run(ProcessAsync);
12 }
13
14 public async ValueTask EnqueueAsync(WorkItem item)
15 {
16 await _channel.Writer.WriteAsync(item);
17 }
18
19 private async Task ProcessAsync()
20 {
21 try
22 {
23 await foreach (var item in _channel.Reader.ReadAllAsync(_shutdownCts.Token))
24 {
25 await ProcessItemAsync(item);
26 }
27 }
28 catch (OperationCanceledException)
29 {
30 // Shutdown requested - drain remaining items
31 while (_channel.Reader.TryRead(out var item))
32 {
33 await ProcessItemAsync(item);
34 }
35 }
36 }
37
38 public async ValueTask DisposeAsync()
39 {
40 _channel.Writer.Complete();
41
42 // Wait for processing to complete (with timeout)
43 var completionTask = _processingTask;
44 var timeoutTask = Task.Delay(TimeSpan.FromSeconds(30));
45
46 if (await Task.WhenAny(completionTask, timeoutTask) == timeoutTask)
47 {
48 // Timeout - cancel and drain
49 _shutdownCts.Cancel();
50 await completionTask;
51 }
52
53 _shutdownCts.Dispose();
54 }
55}

Channel as Async Stream Source

csharp
1public class EventSource
2{
3 private readonly Channel<Event> _channel = Channel.CreateUnbounded<Event>();
4
5 public void RaiseEvent(Event evt) => _channel.Writer.TryWrite(evt);
6
7 public IAsyncEnumerable<Event> GetEventsAsync(CancellationToken ct = default)
8 {
9 return _channel.Reader.ReadAllAsync(ct);
10 }
11}
12
13// Consumer
14await foreach (var evt in eventSource.GetEventsAsync(cancellationToken))
15{
16 await HandleEventAsync(evt);
17}

Performance Tips

1. Use SingleWriter/SingleReader When Applicable

csharp
1var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(100)
2{
3 SingleWriter = true, // Only one producer
4 SingleReader = true // Only one consumer
5});

2. Avoid AllowSynchronousContinuations

csharp
1// Keep false to prevent stack overflow in deep chains
2AllowSynchronousContinuations = false // Default

3. Size Bounded Channels Appropriately

csharp
1// Too small: excessive blocking
2// Too large: memory waste
3// Balance based on production/consumption rates
4var channel = Channel.CreateBounded<int>(
5 Math.Max(100, expectedBurstSize * 2)
6);

Key Takeaways

  • Bounded channels provide backpressure (FullMode.Wait)
  • DropNewest/DropOldest for non-critical data
  • Use TryWrite for non-blocking overflow handling
  • Coordinate multiple writers for proper completion
  • Implement graceful shutdown with drain logic
  • Configure SingleWriter/SingleReader for performance
  • Channels integrate well with IAsyncEnumerable