Batch Processing Strategies
Batch processing groups multiple items together for efficient processing, reducing overhead and improving throughput.
Why Batch?
Without Batching
csharp1// 1000 individual database inserts2foreach (var item in items)3{4 await db.InsertAsync(item); // Network round-trip each time5}6// ~1000 network round-trips!
With Batching
csharp1// 10 bulk inserts of 100 items each2foreach (var batch in items.Chunk(100))3{4 await db.BulkInsertAsync(batch); // One round-trip per batch5}6// Only ~10 network round-trips!
Simple Batching with LINQ
csharp1using System.Linq;23var items = Enumerable.Range(1, 1000).ToList();45// Chunk into batches of 1006foreach (var batch in items.Chunk(100))7{8 Console.WriteLine($"Processing batch of {batch.Length} items");9 await ProcessBatchAsync(batch);10}
Time-Based Batching
Process items when batch is full OR timeout expires:
csharp1class TimedBatcher<T>2{3 private readonly List<T> _buffer = new();4 private readonly int _maxBatchSize;5 private readonly TimeSpan _maxWait;6 private readonly Func<T[], Task> _processBatch;7 private DateTime _lastFlush = DateTime.UtcNow;8 private readonly object _lock = new();910 public TimedBatcher(int maxBatchSize, TimeSpan maxWait, Func<T[], Task> processBatch)11 {12 _maxBatchSize = maxBatchSize;13 _maxWait = maxWait;14 _processBatch = processBatch;15 }1617 public async Task AddAsync(T item)18 {19 T[]? batch = null;2021 lock (_lock)22 {23 _buffer.Add(item);2425 // Flush if batch is full or timeout expired26 if (_buffer.Count >= _maxBatchSize ||27 DateTime.UtcNow - _lastFlush > _maxWait)28 {29 batch = _buffer.ToArray();30 _buffer.Clear();31 _lastFlush = DateTime.UtcNow;32 }33 }3435 if (batch != null)36 {37 await _processBatch(batch);38 }39 }4041 public async Task FlushAsync()42 {43 T[]? batch = null;4445 lock (_lock)46 {47 if (_buffer.Count > 0)48 {49 batch = _buffer.ToArray();50 _buffer.Clear();51 _lastFlush = DateTime.UtcNow;52 }53 }5455 if (batch != null)56 {57 await _processBatch(batch);58 }59 }60}6162// Usage63var batcher = new TimedBatcher<Order>(64 maxBatchSize: 100,65 maxWait: TimeSpan.FromSeconds(5),66 processBatch: async batch =>67 {68 Console.WriteLine($"Processing {batch.Length} orders");69 await database.BulkInsertAsync(batch);70 });7172// Add items - will auto-flush when full or after 5 seconds73foreach (var order in incomingOrders)74{75 await batcher.AddAsync(order);76}7778// Don't forget final flush!79await batcher.FlushAsync();
Parallel Batch Processing
Process multiple batches concurrently:
csharp1var items = Enumerable.Range(1, 1000).ToList();2var batches = items.Chunk(100).ToList();34// Process up to 4 batches in parallel5await Parallel.ForEachAsync(6 batches,7 new ParallelOptions { MaxDegreeOfParallelism = 4 },8 async (batch, ct) =>9 {10 Console.WriteLine($"Processing batch on thread {Environment.CurrentManagedThreadId}");11 await ProcessBatchAsync(batch, ct);12 });
Batch with Error Handling
Handle failures gracefully:
csharp1async Task ProcessWithRetryAsync<T>(IEnumerable<T[]> batches, Func<T[], Task> processor)2{3 var failedBatches = new List<T[]>();45 foreach (var batch in batches)6 {7 try8 {9 await processor(batch);10 }11 catch (Exception ex)12 {13 Console.WriteLine($"Batch failed: {ex.Message}");14 failedBatches.Add(batch);15 }16 }1718 // Retry failed batches with smaller chunks19 if (failedBatches.Count > 0)20 {21 Console.WriteLine($"Retrying {failedBatches.Count} failed batches...");2223 foreach (var batch in failedBatches)24 {25 // Split into smaller batches and retry26 foreach (var smallerBatch in batch.Chunk(10))27 {28 try29 {30 await processor(smallerBatch);31 }32 catch (Exception ex)33 {34 Console.WriteLine($"Item permanently failed: {ex.Message}");35 // Log to dead letter queue36 }37 }38 }39 }40}
Batch Size Tuning
Finding the optimal batch size:
csharp1async Task<int> FindOptimalBatchSizeAsync<T>(2 T[] testItems,3 Func<T[], Task> processor)4{5 var batchSizes = new[] { 10, 50, 100, 200, 500 };6 var results = new Dictionary<int, TimeSpan>();78 foreach (var size in batchSizes)9 {10 var sw = Stopwatch.StartNew();1112 foreach (var batch in testItems.Chunk(size))13 {14 await processor(batch);15 }1617 sw.Stop();18 results[size] = sw.Elapsed;19 Console.WriteLine($"Batch size {size}: {sw.Elapsed.TotalMilliseconds:F0}ms");20 }2122 return results.MinBy(kvp => kvp.Value).Key;23}
Memory-Efficient Streaming Batches
For very large datasets, stream batches without loading everything:
csharp1async IAsyncEnumerable<T[]> BatchAsync<T>(2 IAsyncEnumerable<T> source,3 int batchSize,4 [EnumeratorCancellation] CancellationToken ct = default)5{6 var batch = new List<T>(batchSize);78 await foreach (var item in source.WithCancellation(ct))9 {10 batch.Add(item);1112 if (batch.Count >= batchSize)13 {14 yield return batch.ToArray();15 batch.Clear();16 }17 }1819 // Yield remaining items20 if (batch.Count > 0)21 {22 yield return batch.ToArray();23 }24}2526// Usage with file streaming27async IAsyncEnumerable<string> ReadLinesAsync(string path)28{29 using var reader = new StreamReader(path);30 while (await reader.ReadLineAsync() is { } line)31 {32 yield return line;33 }34}3536// Process file in batches without loading entire file37await foreach (var batch in BatchAsync(ReadLinesAsync("large-file.txt"), 1000))38{39 await ProcessLinesBatchAsync(batch);40}
Complete Example: Batch Upload Service
csharp1class BatchUploadService2{3 private readonly HttpClient _client;4 private readonly int _batchSize;5 private readonly int _maxConcurrency;67 public BatchUploadService(int batchSize = 100, int maxConcurrency = 4)8 {9 _client = new HttpClient();10 _batchSize = batchSize;11 _maxConcurrency = maxConcurrency;12 }1314 public async Task<BatchResult> UploadAsync<T>(15 IEnumerable<T> items,16 CancellationToken ct = default)17 {18 var batches = items.Chunk(_batchSize).ToList();19 var succeeded = 0;20 var failed = 0;21 var semaphore = new SemaphoreSlim(_maxConcurrency);2223 var tasks = batches.Select(async batch =>24 {25 await semaphore.WaitAsync(ct);26 try27 {28 await UploadBatchAsync(batch, ct);29 Interlocked.Add(ref succeeded, batch.Length);30 }31 catch32 {33 Interlocked.Add(ref failed, batch.Length);34 }35 finally36 {37 semaphore.Release();38 }39 });4041 await Task.WhenAll(tasks);4243 return new BatchResult(succeeded, failed, batches.Count);44 }4546 private async Task UploadBatchAsync<T>(T[] batch, CancellationToken ct)47 {48 var json = JsonSerializer.Serialize(batch);49 var content = new StringContent(json, Encoding.UTF8, "application/json");50 var response = await _client.PostAsync("/api/bulk", content, ct);51 response.EnsureSuccessStatusCode();52 }53}5455record BatchResult(int Succeeded, int Failed, int TotalBatches);
Key Takeaways
- Batching reduces network/IO overhead significantly
- Use
Chunk()for simple size-based batching - Time-based batching handles low-volume scenarios
- Always flush remaining items at the end
- Handle batch failures gracefully with retry/smaller chunks
- Stream batches for memory efficiency with large datasets
- Tune batch size based on your specific workload