15 minlesson

Batch Processing Strategies

Batch Processing Strategies

Batch processing groups multiple items together for efficient processing, reducing overhead and improving throughput.

Why Batch?

Without Batching

csharp
1// 1000 individual database inserts
2foreach (var item in items)
3{
4 await db.InsertAsync(item); // Network round-trip each time
5}
6// ~1000 network round-trips!

With Batching

csharp
1// 10 bulk inserts of 100 items each
2foreach (var batch in items.Chunk(100))
3{
4 await db.BulkInsertAsync(batch); // One round-trip per batch
5}
6// Only ~10 network round-trips!

Simple Batching with LINQ

csharp
1using System.Linq;
2
3var items = Enumerable.Range(1, 1000).ToList();
4
5// Chunk into batches of 100
6foreach (var batch in items.Chunk(100))
7{
8 Console.WriteLine($"Processing batch of {batch.Length} items");
9 await ProcessBatchAsync(batch);
10}

Time-Based Batching

Process items when batch is full OR timeout expires:

csharp
1class TimedBatcher<T>
2{
3 private readonly List<T> _buffer = new();
4 private readonly int _maxBatchSize;
5 private readonly TimeSpan _maxWait;
6 private readonly Func<T[], Task> _processBatch;
7 private DateTime _lastFlush = DateTime.UtcNow;
8 private readonly object _lock = new();
9
10 public TimedBatcher(int maxBatchSize, TimeSpan maxWait, Func<T[], Task> processBatch)
11 {
12 _maxBatchSize = maxBatchSize;
13 _maxWait = maxWait;
14 _processBatch = processBatch;
15 }
16
17 public async Task AddAsync(T item)
18 {
19 T[]? batch = null;
20
21 lock (_lock)
22 {
23 _buffer.Add(item);
24
25 // Flush if batch is full or timeout expired
26 if (_buffer.Count >= _maxBatchSize ||
27 DateTime.UtcNow - _lastFlush > _maxWait)
28 {
29 batch = _buffer.ToArray();
30 _buffer.Clear();
31 _lastFlush = DateTime.UtcNow;
32 }
33 }
34
35 if (batch != null)
36 {
37 await _processBatch(batch);
38 }
39 }
40
41 public async Task FlushAsync()
42 {
43 T[]? batch = null;
44
45 lock (_lock)
46 {
47 if (_buffer.Count > 0)
48 {
49 batch = _buffer.ToArray();
50 _buffer.Clear();
51 _lastFlush = DateTime.UtcNow;
52 }
53 }
54
55 if (batch != null)
56 {
57 await _processBatch(batch);
58 }
59 }
60}
61
62// Usage
63var batcher = new TimedBatcher<Order>(
64 maxBatchSize: 100,
65 maxWait: TimeSpan.FromSeconds(5),
66 processBatch: async batch =>
67 {
68 Console.WriteLine($"Processing {batch.Length} orders");
69 await database.BulkInsertAsync(batch);
70 });
71
72// Add items - will auto-flush when full or after 5 seconds
73foreach (var order in incomingOrders)
74{
75 await batcher.AddAsync(order);
76}
77
78// Don't forget final flush!
79await batcher.FlushAsync();

Parallel Batch Processing

Process multiple batches concurrently:

csharp
1var items = Enumerable.Range(1, 1000).ToList();
2var batches = items.Chunk(100).ToList();
3
4// Process up to 4 batches in parallel
5await Parallel.ForEachAsync(
6 batches,
7 new ParallelOptions { MaxDegreeOfParallelism = 4 },
8 async (batch, ct) =>
9 {
10 Console.WriteLine($"Processing batch on thread {Environment.CurrentManagedThreadId}");
11 await ProcessBatchAsync(batch, ct);
12 });

Batch with Error Handling

Handle failures gracefully:

csharp
1async Task ProcessWithRetryAsync<T>(IEnumerable<T[]> batches, Func<T[], Task> processor)
2{
3 var failedBatches = new List<T[]>();
4
5 foreach (var batch in batches)
6 {
7 try
8 {
9 await processor(batch);
10 }
11 catch (Exception ex)
12 {
13 Console.WriteLine($"Batch failed: {ex.Message}");
14 failedBatches.Add(batch);
15 }
16 }
17
18 // Retry failed batches with smaller chunks
19 if (failedBatches.Count > 0)
20 {
21 Console.WriteLine($"Retrying {failedBatches.Count} failed batches...");
22
23 foreach (var batch in failedBatches)
24 {
25 // Split into smaller batches and retry
26 foreach (var smallerBatch in batch.Chunk(10))
27 {
28 try
29 {
30 await processor(smallerBatch);
31 }
32 catch (Exception ex)
33 {
34 Console.WriteLine($"Item permanently failed: {ex.Message}");
35 // Log to dead letter queue
36 }
37 }
38 }
39 }
40}

Batch Size Tuning

Finding the optimal batch size:

csharp
1async Task<int> FindOptimalBatchSizeAsync<T>(
2 T[] testItems,
3 Func<T[], Task> processor)
4{
5 var batchSizes = new[] { 10, 50, 100, 200, 500 };
6 var results = new Dictionary<int, TimeSpan>();
7
8 foreach (var size in batchSizes)
9 {
10 var sw = Stopwatch.StartNew();
11
12 foreach (var batch in testItems.Chunk(size))
13 {
14 await processor(batch);
15 }
16
17 sw.Stop();
18 results[size] = sw.Elapsed;
19 Console.WriteLine($"Batch size {size}: {sw.Elapsed.TotalMilliseconds:F0}ms");
20 }
21
22 return results.MinBy(kvp => kvp.Value).Key;
23}

Memory-Efficient Streaming Batches

For very large datasets, stream batches without loading everything:

csharp
1async IAsyncEnumerable<T[]> BatchAsync<T>(
2 IAsyncEnumerable<T> source,
3 int batchSize,
4 [EnumeratorCancellation] CancellationToken ct = default)
5{
6 var batch = new List<T>(batchSize);
7
8 await foreach (var item in source.WithCancellation(ct))
9 {
10 batch.Add(item);
11
12 if (batch.Count >= batchSize)
13 {
14 yield return batch.ToArray();
15 batch.Clear();
16 }
17 }
18
19 // Yield remaining items
20 if (batch.Count > 0)
21 {
22 yield return batch.ToArray();
23 }
24}
25
26// Usage with file streaming
27async IAsyncEnumerable<string> ReadLinesAsync(string path)
28{
29 using var reader = new StreamReader(path);
30 while (await reader.ReadLineAsync() is { } line)
31 {
32 yield return line;
33 }
34}
35
36// Process file in batches without loading entire file
37await foreach (var batch in BatchAsync(ReadLinesAsync("large-file.txt"), 1000))
38{
39 await ProcessLinesBatchAsync(batch);
40}

Complete Example: Batch Upload Service

csharp
1class BatchUploadService
2{
3 private readonly HttpClient _client;
4 private readonly int _batchSize;
5 private readonly int _maxConcurrency;
6
7 public BatchUploadService(int batchSize = 100, int maxConcurrency = 4)
8 {
9 _client = new HttpClient();
10 _batchSize = batchSize;
11 _maxConcurrency = maxConcurrency;
12 }
13
14 public async Task<BatchResult> UploadAsync<T>(
15 IEnumerable<T> items,
16 CancellationToken ct = default)
17 {
18 var batches = items.Chunk(_batchSize).ToList();
19 var succeeded = 0;
20 var failed = 0;
21 var semaphore = new SemaphoreSlim(_maxConcurrency);
22
23 var tasks = batches.Select(async batch =>
24 {
25 await semaphore.WaitAsync(ct);
26 try
27 {
28 await UploadBatchAsync(batch, ct);
29 Interlocked.Add(ref succeeded, batch.Length);
30 }
31 catch
32 {
33 Interlocked.Add(ref failed, batch.Length);
34 }
35 finally
36 {
37 semaphore.Release();
38 }
39 });
40
41 await Task.WhenAll(tasks);
42
43 return new BatchResult(succeeded, failed, batches.Count);
44 }
45
46 private async Task UploadBatchAsync<T>(T[] batch, CancellationToken ct)
47 {
48 var json = JsonSerializer.Serialize(batch);
49 var content = new StringContent(json, Encoding.UTF8, "application/json");
50 var response = await _client.PostAsync("/api/bulk", content, ct);
51 response.EnsureSuccessStatusCode();
52 }
53}
54
55record BatchResult(int Succeeded, int Failed, int TotalBatches);

Key Takeaways

  • Batching reduces network/IO overhead significantly
  • Use Chunk() for simple size-based batching
  • Time-based batching handles low-volume scenarios
  • Always flush remaining items at the end
  • Handle batch failures gracefully with retry/smaller chunks
  • Stream batches for memory efficiency with large datasets
  • Tune batch size based on your specific workload
Batch Processing Strategies - Anko Academy