15 minlesson

Handling Backpressure in Pipelines

Handling Backpressure in Pipelines

When pipeline stages run at different speeds, backpressure ensures the system remains stable. Understanding and managing backpressure is critical for production pipelines.

What Is Pipeline Backpressure?

When a slow stage can't keep up:

1Fast Stage ████████████████████
2 └───▶ Buffer fills up
3Slow Stage ████████

Without backpressure: Memory exhaustion With backpressure: Fast stage slows down

How Bounded Channels Create Backpressure

csharp
1// Bounded channel creates backpressure
2var channel = Channel.CreateBounded<int>(10); // Max 10 items
3
4// Fast producer
5var producer = Task.Run(async () =>
6{
7 for (int i = 0; i < 1000; i++)
8 {
9 Console.WriteLine($"Producing {i}...");
10 await channel.Writer.WriteAsync(i); // BLOCKS when full
11 Console.WriteLine($"Produced {i}");
12 }
13});
14
15// Slow consumer
16var consumer = Task.Run(async () =>
17{
18 await foreach (var item in channel.Reader.ReadAllAsync())
19 {
20 await Task.Delay(100); // Slow processing
21 Console.WriteLine($"Consumed {item}");
22 }
23});

Identifying the Bottleneck

csharp
1class InstrumentedPipeline
2{
3 public static async Task<(ChannelReader<TOut>, Task)> CreateStage<TIn, TOut>(
4 string name,
5 ChannelReader<TIn> input,
6 ChannelWriter<TOut> output,
7 Func<TIn, TOut> transform)
8 {
9 int processing = 0;
10
11 var task = Task.Run(async () =>
12 {
13 // Monitor task
14 _ = Task.Run(async () =>
15 {
16 while (!input.Completion.IsCompleted)
17 {
18 Console.WriteLine($"[{name}] Waiting: {input.Count}, Processing: {processing}");
19 await Task.Delay(1000);
20 }
21 });
22
23 await foreach (var item in input.ReadAllAsync())
24 {
25 Interlocked.Increment(ref processing);
26
27 var result = transform(item);
28
29 Interlocked.Decrement(ref processing);
30 await output.WriteAsync(result);
31 }
32 output.Complete();
33 });
34
35 return (output, task);
36 }
37}
38
39// Output shows where items accumulate
40// [Parse] Waiting: 0, Processing: 1
41// [Transform] Waiting: 8, Processing: 1 ← Bottleneck!
42// [Write] Waiting: 0, Processing: 1

Strategies for Handling Slow Stages

1. Parallel Workers in Slow Stage

csharp
1async Task ParallelStageAsync<TIn, TOut>(
2 ChannelReader<TIn> input,
3 ChannelWriter<TOut> output,
4 Func<TIn, Task<TOut>> transformAsync,
5 int workerCount = 4)
6{
7 var workers = Enumerable.Range(0, workerCount)
8 .Select(_ => Task.Run(async () =>
9 {
10 await foreach (var item in input.ReadAllAsync())
11 {
12 var result = await transformAsync(item);
13 await output.WriteAsync(result);
14 }
15 }))
16 .ToArray();
17
18 await Task.WhenAll(workers);
19 output.Complete();
20}
21
22// Usage
23await ParallelStageAsync(
24 parseOutput,
25 transformOutput.Writer,
26 async item => await SlowTransformAsync(item),
27 workerCount: 8);

2. Batching for Efficiency

csharp
1async Task BatchingStageAsync<T>(
2 ChannelReader<T> input,
3 ChannelWriter<T[]> output,
4 int batchSize,
5 TimeSpan maxWait)
6{
7 var batch = new List<T>(batchSize);
8 var lastFlush = DateTime.UtcNow;
9
10 await foreach (var item in input.ReadAllAsync())
11 {
12 batch.Add(item);
13
14 bool shouldFlush =
15 batch.Count >= batchSize ||
16 DateTime.UtcNow - lastFlush > maxWait;
17
18 if (shouldFlush)
19 {
20 await output.WriteAsync(batch.ToArray());
21 batch.Clear();
22 lastFlush = DateTime.UtcNow;
23 }
24 }
25
26 // Flush remaining
27 if (batch.Count > 0)
28 {
29 await output.WriteAsync(batch.ToArray());
30 }
31
32 output.Complete();
33}

3. Dropping Items (When Acceptable)

csharp
1var options = new BoundedChannelOptions(100)
2{
3 FullMode = BoundedChannelFullMode.DropOldest
4};
5
6var channel = Channel.CreateBounded<SensorReading>(options);
7
8// Old readings dropped when buffer full
9// Acceptable for real-time monitoring

4. Adaptive Throttling

csharp
1async Task AdaptiveProducerAsync(ChannelWriter<Item> output)
2{
3 int delay = 0;
4
5 while (hasMoreItems)
6 {
7 if (output.TryWrite(item))
8 {
9 // Successfully written - speed up
10 delay = Math.Max(0, delay - 10);
11 }
12 else
13 {
14 // Buffer full - slow down
15 delay = Math.Min(1000, delay + 100);
16 await Task.Delay(delay);
17 await output.WriteAsync(item);
18 }
19 }
20}

Buffer Sizing Guidelines

ScenarioBuffer SizeRationale
Fast stages10-100Small buffer fine
Variable timing100-1000Absorb bursts
Batch writesbatch size × 2Room for next batch
Memory constrainedMinimum viableTrade speed for memory
csharp
1// Calculate based on expected throughput
2int bufferSize = (int)(
3 expectedItemsPerSecond *
4 maxAcceptableLatencySeconds *
5 safetyMargin);

Monitoring Backpressure

csharp
1class BackpressureMonitor
2{
3 private readonly Channel<int> _channel;
4 private readonly int _capacity;
5
6 public BackpressureMonitor(Channel<int> channel, int capacity)
7 {
8 _channel = channel;
9 _capacity = capacity;
10 }
11
12 public int CurrentCount => _channel.Reader.Count;
13 public bool IsNearCapacity => CurrentCount > _capacity * 0.8;
14
15 public async Task MonitorAsync(CancellationToken ct)
16 {
17 while (!ct.IsCancellationRequested)
18 {
19 var fillPercent = (double)CurrentCount / _capacity * 100;
20 Console.WriteLine($"Buffer: {fillPercent:F1}% full");
21
22 if (fillPercent > 90)
23 {
24 Console.WriteLine("WARNING: High backpressure!");
25 }
26
27 await Task.Delay(1000, ct);
28 }
29 }
30}

Key Takeaways

  • Backpressure prevents memory exhaustion
  • Bounded channels automatically create backpressure
  • Identify bottlenecks by monitoring queue depths
  • Parallelize slow stages with multiple workers
  • Batch items for efficiency
  • Drop items only when data loss is acceptable
  • Size buffers based on throughput and latency requirements
  • Monitor pipeline health in production