Channel-Based Batching
Channels provide an elegant way to implement batching in async producer/consumer scenarios.
Basic Channel Batching
Read multiple items at once from a channel:
csharp1using System.Threading.Channels;23var channel = Channel.CreateUnbounded<int>();45// Producer - writes items one at a time6var producer = Task.Run(async () =>7{8 for (int i = 0; i < 100; i++)9 {10 await channel.Writer.WriteAsync(i);11 await Task.Delay(10); // Simulate incoming data12 }13 channel.Writer.Complete();14});1516// Consumer - reads in batches17var consumer = Task.Run(async () =>18{19 var batch = new List<int>();20 var batchSize = 10;2122 await foreach (var item in channel.Reader.ReadAllAsync())23 {24 batch.Add(item);2526 if (batch.Count >= batchSize)27 {28 Console.WriteLine($"Processing batch: [{string.Join(", ", batch)}]");29 batch.Clear();30 }31 }3233 // Process remaining items34 if (batch.Count > 0)35 {36 Console.WriteLine($"Processing final batch: [{string.Join(", ", batch)}]");37 }38});3940await Task.WhenAll(producer, consumer);
Time-Bounded Channel Batching
Batch by size OR timeout, whichever comes first:
csharp1async Task<List<T>> ReadBatchAsync<T>(2 ChannelReader<T> reader,3 int maxBatchSize,4 TimeSpan timeout,5 CancellationToken ct = default)6{7 var batch = new List<T>(maxBatchSize);8 using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);9 cts.CancelAfter(timeout);1011 try12 {13 while (batch.Count < maxBatchSize)14 {15 // Try to read with timeout16 var item = await reader.ReadAsync(cts.Token);17 batch.Add(item);18 }19 }20 catch (OperationCanceledException) when (!ct.IsCancellationRequested)21 {22 // Timeout expired - return what we have23 }24 catch (ChannelClosedException)25 {26 // Channel completed - return what we have27 }2829 return batch;30}3132// Usage33var channel = Channel.CreateUnbounded<Order>();3435while (await channel.Reader.WaitToReadAsync())36{37 var batch = await ReadBatchAsync(38 channel.Reader,39 maxBatchSize: 100,40 timeout: TimeSpan.FromSeconds(5));4142 if (batch.Count > 0)43 {44 await ProcessOrderBatchAsync(batch);45 }46}
Efficient Batch Reader Extension
A reusable extension method for batch reading:
csharp1static class ChannelExtensions2{3 public static async IAsyncEnumerable<T[]> ReadBatchesAsync<T>(4 this ChannelReader<T> reader,5 int batchSize,6 TimeSpan maxWait,7 [EnumeratorCancellation] CancellationToken ct = default)8 {9 var batch = new List<T>(batchSize);1011 while (!ct.IsCancellationRequested)12 {13 // Wait for at least one item or channel completion14 if (!await reader.WaitToReadAsync(ct))15 {16 break; // Channel completed17 }1819 using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(ct);20 timeoutCts.CancelAfter(maxWait);2122 try23 {24 // Read until batch full or timeout25 while (batch.Count < batchSize &&26 reader.TryRead(out var item))27 {28 batch.Add(item);29 }3031 // If batch not full, wait for more with timeout32 while (batch.Count < batchSize)33 {34 var item = await reader.ReadAsync(timeoutCts.Token);35 batch.Add(item);36 }37 }38 catch (OperationCanceledException) when (!ct.IsCancellationRequested)39 {40 // Timeout - yield what we have41 }42 catch (ChannelClosedException)43 {44 // Channel done - yield what we have and exit45 if (batch.Count > 0)46 {47 yield return batch.ToArray();48 }49 yield break;50 }5152 if (batch.Count > 0)53 {54 yield return batch.ToArray();55 batch.Clear();56 }57 }5859 // Final batch60 if (batch.Count > 0)61 {62 yield return batch.ToArray();63 }64 }65}6667// Clean usage68var channel = Channel.CreateUnbounded<LogEntry>();6970await foreach (var batch in channel.Reader.ReadBatchesAsync(71 batchSize: 100,72 maxWait: TimeSpan.FromSeconds(2)))73{74 await WriteLogBatchToStorageAsync(batch);75}
Batching in a Pipeline
Combine batching with pipeline stages:
csharp1// Stage 1: Read individual items2var itemChannel = Channel.CreateBounded<RawData>(1000);34// Stage 2: Batch items5var batchChannel = Channel.CreateBounded<RawData[]>(10);67// Stage 3: Process batches8var resultChannel = Channel.CreateBounded<ProcessedBatch>(10);910// Batcher task11var batcher = Task.Run(async () =>12{13 await foreach (var batch in itemChannel.Reader.ReadBatchesAsync(14 batchSize: 50,15 maxWait: TimeSpan.FromSeconds(1)))16 {17 await batchChannel.Writer.WriteAsync(batch);18 }19 batchChannel.Writer.Complete();20});2122// Processor task23var processor = Task.Run(async () =>24{25 await foreach (var batch in batchChannel.Reader.ReadAllAsync())26 {27 var result = await ProcessBatchAsync(batch);28 await resultChannel.Writer.WriteAsync(result);29 }30 resultChannel.Writer.Complete();31});
Parallel Batch Consumers
Multiple consumers processing batches:
csharp1var batchChannel = Channel.CreateBounded<Order[]>(20);2var consumerCount = 4;34var consumers = Enumerable.Range(0, consumerCount)5 .Select(id => Task.Run(async () =>6 {7 await foreach (var batch in batchChannel.Reader.ReadAllAsync())8 {9 Console.WriteLine($"Consumer {id}: Processing {batch.Length} orders");10 await SaveOrderBatchAsync(batch);11 }12 }))13 .ToArray();1415await Task.WhenAll(consumers);
Bounded Channel for Backpressure
Control memory usage with bounded channels:
csharp1// Only allow 5 batches to queue up2var batchChannel = Channel.CreateBounded<Data[]>(new BoundedChannelOptions(5)3{4 FullMode = BoundedChannelFullMode.Wait,5 SingleReader = false,6 SingleWriter = true7});89// Producer will block when 5 batches are queued10var producer = Task.Run(async () =>11{12 var batch = new List<Data>();1314 await foreach (var item in sourceChannel.Reader.ReadAllAsync())15 {16 batch.Add(item);1718 if (batch.Count >= 100)19 {20 // Will wait if channel is full (backpressure)21 await batchChannel.Writer.WriteAsync(batch.ToArray());22 batch.Clear();23 Console.WriteLine("Batch written, continuing...");24 }25 }2627 if (batch.Count > 0)28 {29 await batchChannel.Writer.WriteAsync(batch.ToArray());30 }31 batchChannel.Writer.Complete();32});
Complete Example: Log Aggregator
csharp1class LogAggregator : IAsyncDisposable2{3 private readonly Channel<LogEntry> _channel;4 private readonly Task _processorTask;5 private readonly CancellationTokenSource _cts = new();67 public LogAggregator(ILogStorage storage, int batchSize = 100, int flushIntervalMs = 5000)8 {9 _channel = Channel.CreateBounded<LogEntry>(10000);1011 _processorTask = Task.Run(async () =>12 {13 try14 {15 await foreach (var batch in _channel.Reader.ReadBatchesAsync(16 batchSize,17 TimeSpan.FromMilliseconds(flushIntervalMs),18 _cts.Token))19 {20 try21 {22 await storage.WriteBatchAsync(batch);23 Console.WriteLine($"Flushed {batch.Length} log entries");24 }25 catch (Exception ex)26 {27 Console.WriteLine($"Failed to flush logs: {ex.Message}");28 // Could implement retry or dead-letter queue here29 }30 }31 }32 catch (OperationCanceledException)33 {34 // Shutting down35 }36 });37 }3839 public ValueTask LogAsync(LogEntry entry)40 {41 return _channel.Writer.WriteAsync(entry);42 }4344 public async ValueTask DisposeAsync()45 {46 _channel.Writer.Complete();47 await _cts.CancelAsync();48 await _processorTask;49 _cts.Dispose();50 }51}5253record LogEntry(DateTime Timestamp, string Level, string Message);5455interface ILogStorage56{57 Task WriteBatchAsync(LogEntry[] entries);58}5960// Usage61await using var aggregator = new LogAggregator(storage);6263// Log entries are automatically batched64await aggregator.LogAsync(new LogEntry(DateTime.UtcNow, "INFO", "Starting..."));65await aggregator.LogAsync(new LogEntry(DateTime.UtcNow, "DEBUG", "Processing..."));66// ... logs accumulate and flush automatically
Key Takeaways
- Channels naturally support batching patterns
- Use
TryReadfor non-blocking batch collection - Combine size limits with timeouts for responsive batching
- Extension methods make batch reading reusable
- Bounded channels provide backpressure for batch queues
- Multiple consumers can process batches in parallel
- Always handle channel completion and final partial batches