Handling Backpressure in Pipelines
When pipeline stages run at different speeds, backpressure ensures the system remains stable. Understanding and managing backpressure is critical for production pipelines.
What Is Pipeline Backpressure?
When a slow stage can't keep up:
1Fast Stage ████████████████████2 └───▶ Buffer fills up3Slow Stage ████████
Without backpressure: Memory exhaustion With backpressure: Fast stage slows down
How Bounded Channels Create Backpressure
csharp1// Bounded channel creates backpressure2var channel = Channel.CreateBounded<int>(10); // Max 10 items34// Fast producer5var producer = Task.Run(async () =>6{7 for (int i = 0; i < 1000; i++)8 {9 Console.WriteLine($"Producing {i}...");10 await channel.Writer.WriteAsync(i); // BLOCKS when full11 Console.WriteLine($"Produced {i}");12 }13});1415// Slow consumer16var consumer = Task.Run(async () =>17{18 await foreach (var item in channel.Reader.ReadAllAsync())19 {20 await Task.Delay(100); // Slow processing21 Console.WriteLine($"Consumed {item}");22 }23});
Identifying the Bottleneck
csharp1class InstrumentedPipeline2{3 public static async Task<(ChannelReader<TOut>, Task)> CreateStage<TIn, TOut>(4 string name,5 ChannelReader<TIn> input,6 ChannelWriter<TOut> output,7 Func<TIn, TOut> transform)8 {9 int processing = 0;1011 var task = Task.Run(async () =>12 {13 // Monitor task14 _ = Task.Run(async () =>15 {16 while (!input.Completion.IsCompleted)17 {18 Console.WriteLine($"[{name}] Waiting: {input.Count}, Processing: {processing}");19 await Task.Delay(1000);20 }21 });2223 await foreach (var item in input.ReadAllAsync())24 {25 Interlocked.Increment(ref processing);2627 var result = transform(item);2829 Interlocked.Decrement(ref processing);30 await output.WriteAsync(result);31 }32 output.Complete();33 });3435 return (output, task);36 }37}3839// Output shows where items accumulate40// [Parse] Waiting: 0, Processing: 141// [Transform] Waiting: 8, Processing: 1 ← Bottleneck!42// [Write] Waiting: 0, Processing: 1
Strategies for Handling Slow Stages
1. Parallel Workers in Slow Stage
csharp1async Task ParallelStageAsync<TIn, TOut>(2 ChannelReader<TIn> input,3 ChannelWriter<TOut> output,4 Func<TIn, Task<TOut>> transformAsync,5 int workerCount = 4)6{7 var workers = Enumerable.Range(0, workerCount)8 .Select(_ => Task.Run(async () =>9 {10 await foreach (var item in input.ReadAllAsync())11 {12 var result = await transformAsync(item);13 await output.WriteAsync(result);14 }15 }))16 .ToArray();1718 await Task.WhenAll(workers);19 output.Complete();20}2122// Usage23await ParallelStageAsync(24 parseOutput,25 transformOutput.Writer,26 async item => await SlowTransformAsync(item),27 workerCount: 8);
2. Batching for Efficiency
csharp1async Task BatchingStageAsync<T>(2 ChannelReader<T> input,3 ChannelWriter<T[]> output,4 int batchSize,5 TimeSpan maxWait)6{7 var batch = new List<T>(batchSize);8 var lastFlush = DateTime.UtcNow;910 await foreach (var item in input.ReadAllAsync())11 {12 batch.Add(item);1314 bool shouldFlush =15 batch.Count >= batchSize ||16 DateTime.UtcNow - lastFlush > maxWait;1718 if (shouldFlush)19 {20 await output.WriteAsync(batch.ToArray());21 batch.Clear();22 lastFlush = DateTime.UtcNow;23 }24 }2526 // Flush remaining27 if (batch.Count > 0)28 {29 await output.WriteAsync(batch.ToArray());30 }3132 output.Complete();33}
3. Dropping Items (When Acceptable)
csharp1var options = new BoundedChannelOptions(100)2{3 FullMode = BoundedChannelFullMode.DropOldest4};56var channel = Channel.CreateBounded<SensorReading>(options);78// Old readings dropped when buffer full9// Acceptable for real-time monitoring
4. Adaptive Throttling
csharp1async Task AdaptiveProducerAsync(ChannelWriter<Item> output)2{3 int delay = 0;45 while (hasMoreItems)6 {7 if (output.TryWrite(item))8 {9 // Successfully written - speed up10 delay = Math.Max(0, delay - 10);11 }12 else13 {14 // Buffer full - slow down15 delay = Math.Min(1000, delay + 100);16 await Task.Delay(delay);17 await output.WriteAsync(item);18 }19 }20}
Buffer Sizing Guidelines
| Scenario | Buffer Size | Rationale |
|---|---|---|
| Fast stages | 10-100 | Small buffer fine |
| Variable timing | 100-1000 | Absorb bursts |
| Batch writes | batch size × 2 | Room for next batch |
| Memory constrained | Minimum viable | Trade speed for memory |
csharp1// Calculate based on expected throughput2int bufferSize = (int)(3 expectedItemsPerSecond *4 maxAcceptableLatencySeconds *5 safetyMargin);
Monitoring Backpressure
csharp1class BackpressureMonitor2{3 private readonly Channel<int> _channel;4 private readonly int _capacity;56 public BackpressureMonitor(Channel<int> channel, int capacity)7 {8 _channel = channel;9 _capacity = capacity;10 }1112 public int CurrentCount => _channel.Reader.Count;13 public bool IsNearCapacity => CurrentCount > _capacity * 0.8;1415 public async Task MonitorAsync(CancellationToken ct)16 {17 while (!ct.IsCancellationRequested)18 {19 var fillPercent = (double)CurrentCount / _capacity * 100;20 Console.WriteLine($"Buffer: {fillPercent:F1}% full");2122 if (fillPercent > 90)23 {24 Console.WriteLine("WARNING: High backpressure!");25 }2627 await Task.Delay(1000, ct);28 }29 }30}
Key Takeaways
- Backpressure prevents memory exhaustion
- Bounded channels automatically create backpressure
- Identify bottlenecks by monitoring queue depths
- Parallelize slow stages with multiple workers
- Batch items for efficiency
- Drop items only when data loss is acceptable
- Size buffers based on throughput and latency requirements
- Monitor pipeline health in production