15 minlesson

Building Pipelines with Channels

Building Pipelines with Channels

This lesson shows how to build robust data pipelines using System.Threading.Channels.

Pipeline Stage Pattern

Each stage follows a common pattern:

csharp
1async Task StageAsync<TIn, TOut>(
2 ChannelReader<TIn> input,
3 ChannelWriter<TOut> output,
4 Func<TIn, TOut> transform)
5{
6 try
7 {
8 await foreach (var item in input.ReadAllAsync())
9 {
10 var result = transform(item);
11 await output.WriteAsync(result);
12 }
13 }
14 finally
15 {
16 output.Complete();
17 }
18}

Creating Connected Channels

csharp
1public class Pipeline
2{
3 public static (ChannelReader<TOut>, Task) CreateStage<TIn, TOut>(
4 ChannelReader<TIn> input,
5 Func<TIn, TOut> transform,
6 int bufferSize = 10)
7 {
8 var channel = Channel.CreateBounded<TOut>(bufferSize);
9
10 var task = Task.Run(async () =>
11 {
12 try
13 {
14 await foreach (var item in input.ReadAllAsync())
15 {
16 await channel.Writer.WriteAsync(transform(item));
17 }
18 }
19 finally
20 {
21 channel.Writer.Complete();
22 }
23 });
24
25 return (channel.Reader, task);
26 }
27}

Complete Pipeline Example

csharp
1// Data types
2record RawData(string Line);
3record ParsedData(int Id, string Name, decimal Value);
4record TransformedData(int Id, string UpperName, decimal AdjustedValue);
5
6// Pipeline stages
7ParsedData Parse(RawData raw)
8{
9 var parts = raw.Line.Split(',');
10 return new ParsedData(
11 int.Parse(parts[0]),
12 parts[1],
13 decimal.Parse(parts[2]));
14}
15
16TransformedData Transform(ParsedData data)
17{
18 return new TransformedData(
19 data.Id,
20 data.Name.ToUpper(),
21 data.Value * 1.1m);
22}
23
24// Build the pipeline
25var sourceChannel = Channel.CreateBounded<RawData>(100);
26
27// Stage 1: Parse
28var (parseOutput, parseTask) = Pipeline.CreateStage(
29 sourceChannel.Reader,
30 Parse);
31
32// Stage 2: Transform
33var (transformOutput, transformTask) = Pipeline.CreateStage(
34 parseOutput,
35 Transform);
36
37// Stage 3: Write (final consumer)
38var writeTask = Task.Run(async () =>
39{
40 await foreach (var item in transformOutput.ReadAllAsync())
41 {
42 Console.WriteLine($"{item.Id}: {item.UpperName} = {item.AdjustedValue}");
43 }
44});
45
46// Feed the pipeline
47foreach (var line in File.ReadLines("data.csv"))
48{
49 await sourceChannel.Writer.WriteAsync(new RawData(line));
50}
51sourceChannel.Writer.Complete();
52
53// Wait for completion
54await Task.WhenAll(parseTask, transformTask, writeTask);

Async Stages

For stages with async operations:

csharp
1public static (ChannelReader<TOut>, Task) CreateAsyncStage<TIn, TOut>(
2 ChannelReader<TIn> input,
3 Func<TIn, Task<TOut>> transformAsync,
4 int bufferSize = 10)
5{
6 var channel = Channel.CreateBounded<TOut>(bufferSize);
7
8 var task = Task.Run(async () =>
9 {
10 try
11 {
12 await foreach (var item in input.ReadAllAsync())
13 {
14 var result = await transformAsync(item);
15 await channel.Writer.WriteAsync(result);
16 }
17 }
18 finally
19 {
20 channel.Writer.Complete();
21 }
22 });
23
24 return (channel.Reader, task);
25}
26
27// Usage with async transform
28var (enrichedOutput, enrichTask) = Pipeline.CreateAsyncStage(
29 parseOutput,
30 async data => await EnrichFromApiAsync(data));

Fluent Pipeline Builder

csharp
1public class PipelineBuilder<T>
2{
3 private readonly ChannelReader<T> _reader;
4 private readonly List<Task> _tasks = new();
5
6 public PipelineBuilder(ChannelReader<T> reader)
7 {
8 _reader = reader;
9 }
10
11 public PipelineBuilder<TOut> Pipe<TOut>(Func<T, TOut> transform, int buffer = 10)
12 {
13 var channel = Channel.CreateBounded<TOut>(buffer);
14
15 var task = Task.Run(async () =>
16 {
17 try
18 {
19 await foreach (var item in _reader.ReadAllAsync())
20 {
21 await channel.Writer.WriteAsync(transform(item));
22 }
23 }
24 finally
25 {
26 channel.Writer.Complete();
27 }
28 });
29
30 _tasks.Add(task);
31 return new PipelineBuilder<TOut>(channel.Reader) { Tasks = _tasks };
32 }
33
34 private List<Task> Tasks { get; init; } = new();
35
36 public async Task ConsumeAsync(Func<T, Task> consumer)
37 {
38 var consumeTask = Task.Run(async () =>
39 {
40 await foreach (var item in _reader.ReadAllAsync())
41 {
42 await consumer(item);
43 }
44 });
45
46 await Task.WhenAll(Tasks.Append(consumeTask));
47 }
48}
49
50// Fluent usage
51var source = Channel.CreateUnbounded<string>();
52
53await new PipelineBuilder<string>(source.Reader)
54 .Pipe(line => ParseLine(line))
55 .Pipe(data => ValidateData(data))
56 .Pipe(data => TransformData(data))
57 .ConsumeAsync(async data => await SaveAsync(data));

Error Handling in Stages

csharp
1async Task SafeStageAsync<TIn, TOut>(
2 ChannelReader<TIn> input,
3 ChannelWriter<TOut> output,
4 ChannelWriter<(TIn Item, Exception Error)> errors,
5 Func<TIn, TOut> transform)
6{
7 try
8 {
9 await foreach (var item in input.ReadAllAsync())
10 {
11 try
12 {
13 var result = transform(item);
14 await output.WriteAsync(result);
15 }
16 catch (Exception ex)
17 {
18 await errors.WriteAsync((item, ex));
19 }
20 }
21 }
22 finally
23 {
24 output.Complete();
25 errors.Complete();
26 }
27}

Pipeline with Metrics

csharp
1class MeteredStage<TIn, TOut>
2{
3 private int _processed;
4 private int _errors;
5
6 public int Processed => _processed;
7 public int Errors => _errors;
8
9 public async Task RunAsync(
10 ChannelReader<TIn> input,
11 ChannelWriter<TOut> output,
12 Func<TIn, TOut> transform)
13 {
14 try
15 {
16 await foreach (var item in input.ReadAllAsync())
17 {
18 try
19 {
20 var result = transform(item);
21 await output.WriteAsync(result);
22 Interlocked.Increment(ref _processed);
23 }
24 catch
25 {
26 Interlocked.Increment(ref _errors);
27 }
28 }
29 }
30 finally
31 {
32 output.Complete();
33 }
34 }
35}

Key Takeaways

  • Each stage reads from input channel, writes to output channel
  • Always complete the output channel in finally block
  • Use bounded channels for backpressure
  • Async stages support I/O operations
  • Fluent builders improve readability
  • Add error channels for failed items
  • Track metrics with Interlocked