15 minlesson

Channel-Based Batching

Channel-Based Batching

Channels provide an elegant way to implement batching in async producer/consumer scenarios.

Basic Channel Batching

Read multiple items at once from a channel:

csharp
1using System.Threading.Channels;
2
3var channel = Channel.CreateUnbounded<int>();
4
5// Producer - writes items one at a time
6var producer = Task.Run(async () =>
7{
8 for (int i = 0; i < 100; i++)
9 {
10 await channel.Writer.WriteAsync(i);
11 await Task.Delay(10); // Simulate incoming data
12 }
13 channel.Writer.Complete();
14});
15
16// Consumer - reads in batches
17var consumer = Task.Run(async () =>
18{
19 var batch = new List<int>();
20 var batchSize = 10;
21
22 await foreach (var item in channel.Reader.ReadAllAsync())
23 {
24 batch.Add(item);
25
26 if (batch.Count >= batchSize)
27 {
28 Console.WriteLine($"Processing batch: [{string.Join(", ", batch)}]");
29 batch.Clear();
30 }
31 }
32
33 // Process remaining items
34 if (batch.Count > 0)
35 {
36 Console.WriteLine($"Processing final batch: [{string.Join(", ", batch)}]");
37 }
38});
39
40await Task.WhenAll(producer, consumer);

Time-Bounded Channel Batching

Batch by size OR timeout, whichever comes first:

csharp
1async Task<List<T>> ReadBatchAsync<T>(
2 ChannelReader<T> reader,
3 int maxBatchSize,
4 TimeSpan timeout,
5 CancellationToken ct = default)
6{
7 var batch = new List<T>(maxBatchSize);
8 using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
9 cts.CancelAfter(timeout);
10
11 try
12 {
13 while (batch.Count < maxBatchSize)
14 {
15 // Try to read with timeout
16 var item = await reader.ReadAsync(cts.Token);
17 batch.Add(item);
18 }
19 }
20 catch (OperationCanceledException) when (!ct.IsCancellationRequested)
21 {
22 // Timeout expired - return what we have
23 }
24 catch (ChannelClosedException)
25 {
26 // Channel completed - return what we have
27 }
28
29 return batch;
30}
31
32// Usage
33var channel = Channel.CreateUnbounded<Order>();
34
35while (await channel.Reader.WaitToReadAsync())
36{
37 var batch = await ReadBatchAsync(
38 channel.Reader,
39 maxBatchSize: 100,
40 timeout: TimeSpan.FromSeconds(5));
41
42 if (batch.Count > 0)
43 {
44 await ProcessOrderBatchAsync(batch);
45 }
46}

Efficient Batch Reader Extension

A reusable extension method for batch reading:

csharp
1static class ChannelExtensions
2{
3 public static async IAsyncEnumerable<T[]> ReadBatchesAsync<T>(
4 this ChannelReader<T> reader,
5 int batchSize,
6 TimeSpan maxWait,
7 [EnumeratorCancellation] CancellationToken ct = default)
8 {
9 var batch = new List<T>(batchSize);
10
11 while (!ct.IsCancellationRequested)
12 {
13 // Wait for at least one item or channel completion
14 if (!await reader.WaitToReadAsync(ct))
15 {
16 break; // Channel completed
17 }
18
19 using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);
20 timeoutCts.CancelAfter(maxWait);
21
22 try
23 {
24 // Read until batch full or timeout
25 while (batch.Count < batchSize &&
26 reader.TryRead(out var item))
27 {
28 batch.Add(item);
29 }
30
31 // If batch not full, wait for more with timeout
32 while (batch.Count < batchSize)
33 {
34 var item = await reader.ReadAsync(timeoutCts.Token);
35 batch.Add(item);
36 }
37 }
38 catch (OperationCanceledException) when (!ct.IsCancellationRequested)
39 {
40 // Timeout - yield what we have
41 }
42 catch (ChannelClosedException)
43 {
44 // Channel done - yield what we have and exit
45 if (batch.Count > 0)
46 {
47 yield return batch.ToArray();
48 }
49 yield break;
50 }
51
52 if (batch.Count > 0)
53 {
54 yield return batch.ToArray();
55 batch.Clear();
56 }
57 }
58
59 // Final batch
60 if (batch.Count > 0)
61 {
62 yield return batch.ToArray();
63 }
64 }
65}
66
67// Clean usage
68var channel = Channel.CreateUnbounded<LogEntry>();
69
70await foreach (var batch in channel.Reader.ReadBatchesAsync(
71 batchSize: 100,
72 maxWait: TimeSpan.FromSeconds(2)))
73{
74 await WriteLogBatchToStorageAsync(batch);
75}

Batching in a Pipeline

Combine batching with pipeline stages:

csharp
1// Stage 1: Read individual items
2var itemChannel = Channel.CreateBounded<RawData>(1000);
3
4// Stage 2: Batch items
5var batchChannel = Channel.CreateBounded<RawData[]>(10);
6
7// Stage 3: Process batches
8var resultChannel = Channel.CreateBounded<ProcessedBatch>(10);
9
10// Batcher task
11var batcher = Task.Run(async () =>
12{
13 await foreach (var batch in itemChannel.Reader.ReadBatchesAsync(
14 batchSize: 50,
15 maxWait: TimeSpan.FromSeconds(1)))
16 {
17 await batchChannel.Writer.WriteAsync(batch);
18 }
19 batchChannel.Writer.Complete();
20});
21
22// Processor task
23var processor = Task.Run(async () =>
24{
25 await foreach (var batch in batchChannel.Reader.ReadAllAsync())
26 {
27 var result = await ProcessBatchAsync(batch);
28 await resultChannel.Writer.WriteAsync(result);
29 }
30 resultChannel.Writer.Complete();
31});

Parallel Batch Consumers

Multiple consumers processing batches:

csharp
1var batchChannel = Channel.CreateBounded<Order[]>(20);
2var consumerCount = 4;
3
4var consumers = Enumerable.Range(0, consumerCount)
5 .Select(id => Task.Run(async () =>
6 {
7 await foreach (var batch in batchChannel.Reader.ReadAllAsync())
8 {
9 Console.WriteLine($"Consumer {id}: Processing {batch.Length} orders");
10 await SaveOrderBatchAsync(batch);
11 }
12 }))
13 .ToArray();
14
15await Task.WhenAll(consumers);

Bounded Channel for Backpressure

Control memory usage with bounded channels:

csharp
1// Only allow 5 batches to queue up
2var batchChannel = Channel.CreateBounded<Data[]>(new BoundedChannelOptions(5)
3{
4 FullMode = BoundedChannelFullMode.Wait,
5 SingleReader = false,
6 SingleWriter = true
7});
8
9// Producer will block when 5 batches are queued
10var producer = Task.Run(async () =>
11{
12 var batch = new List<Data>();
13
14 await foreach (var item in sourceChannel.Reader.ReadAllAsync())
15 {
16 batch.Add(item);
17
18 if (batch.Count >= 100)
19 {
20 // Will wait if channel is full (backpressure)
21 await batchChannel.Writer.WriteAsync(batch.ToArray());
22 batch.Clear();
23 Console.WriteLine("Batch written, continuing...");
24 }
25 }
26
27 if (batch.Count > 0)
28 {
29 await batchChannel.Writer.WriteAsync(batch.ToArray());
30 }
31 batchChannel.Writer.Complete();
32});

Complete Example: Log Aggregator

csharp
1class LogAggregator : IAsyncDisposable
2{
3 private readonly Channel<LogEntry> _channel;
4 private readonly Task _processorTask;
5 private readonly CancellationTokenSource _cts = new();
6
7 public LogAggregator(ILogStorage storage, int batchSize = 100, int flushIntervalMs = 5000)
8 {
9 _channel = Channel.CreateBounded<LogEntry>(10000);
10
11 _processorTask = Task.Run(async () =>
12 {
13 try
14 {
15 await foreach (var batch in _channel.Reader.ReadBatchesAsync(
16 batchSize,
17 TimeSpan.FromMilliseconds(flushIntervalMs),
18 _cts.Token))
19 {
20 try
21 {
22 await storage.WriteBatchAsync(batch);
23 Console.WriteLine($"Flushed {batch.Length} log entries");
24 }
25 catch (Exception ex)
26 {
27 Console.WriteLine($"Failed to flush logs: {ex.Message}");
28 // Could implement retry or dead-letter queue here
29 }
30 }
31 }
32 catch (OperationCanceledException)
33 {
34 // Shutting down
35 }
36 });
37 }
38
39 public ValueTask LogAsync(LogEntry entry)
40 {
41 return _channel.Writer.WriteAsync(entry);
42 }
43
44 public async ValueTask DisposeAsync()
45 {
46 _channel.Writer.Complete();
47 await _cts.CancelAsync();
48 await _processorTask;
49 _cts.Dispose();
50 }
51}
52
53record LogEntry(DateTime Timestamp, string Level, string Message);
54
55interface ILogStorage
56{
57 Task WriteBatchAsync(LogEntry[] entries);
58}
59
60// Usage
61await using var aggregator = new LogAggregator(storage);
62
63// Log entries are automatically batched
64await aggregator.LogAsync(new LogEntry(DateTime.UtcNow, "INFO", "Starting..."));
65await aggregator.LogAsync(new LogEntry(DateTime.UtcNow, "DEBUG", "Processing..."));
66// ... logs accumulate and flush automatically

Key Takeaways

  • Channels naturally support batching patterns
  • Use TryRead for non-blocking batch collection
  • Combine size limits with timeouts for responsive batching
  • Extension methods make batch reading reusable
  • Bounded channels provide backpressure for batch queues
  • Multiple consumers can process batches in parallel
  • Always handle channel completion and final partial batches