Building Pipelines with Channels
This lesson shows how to build robust data pipelines using System.Threading.Channels.
Pipeline Stage Pattern
Each stage follows a common pattern:
csharp1async Task StageAsync<TIn, TOut>(2 ChannelReader<TIn> input,3 ChannelWriter<TOut> output,4 Func<TIn, TOut> transform)5{6 try7 {8 await foreach (var item in input.ReadAllAsync())9 {10 var result = transform(item);11 await output.WriteAsync(result);12 }13 }14 finally15 {16 output.Complete();17 }18}
Creating Connected Channels
csharp1public class Pipeline2{3 public static (ChannelReader<TOut>, Task) CreateStage<TIn, TOut>(4 ChannelReader<TIn> input,5 Func<TIn, TOut> transform,6 int bufferSize = 10)7 {8 var channel = Channel.CreateBounded<TOut>(bufferSize);910 var task = Task.Run(async () =>11 {12 try13 {14 await foreach (var item in input.ReadAllAsync())15 {16 await channel.Writer.WriteAsync(transform(item));17 }18 }19 finally20 {21 channel.Writer.Complete();22 }23 });2425 return (channel.Reader, task);26 }27}
Complete Pipeline Example
csharp1// Data types2record RawData(string Line);3record ParsedData(int Id, string Name, decimal Value);4record TransformedData(int Id, string UpperName, decimal AdjustedValue);56// Pipeline stages7ParsedData Parse(RawData raw)8{9 var parts = raw.Line.Split(',');10 return new ParsedData(11 int.Parse(parts[0]),12 parts[1],13 decimal.Parse(parts[2]));14}1516TransformedData Transform(ParsedData data)17{18 return new TransformedData(19 data.Id,20 data.Name.ToUpper(),21 data.Value * 1.1m);22}2324// Build the pipeline25var sourceChannel = Channel.CreateBounded<RawData>(100);2627// Stage 1: Parse28var (parseOutput, parseTask) = Pipeline.CreateStage(29 sourceChannel.Reader,30 Parse);3132// Stage 2: Transform33var (transformOutput, transformTask) = Pipeline.CreateStage(34 parseOutput,35 Transform);3637// Stage 3: Write (final consumer)38var writeTask = Task.Run(async () =>39{40 await foreach (var item in transformOutput.ReadAllAsync())41 {42 Console.WriteLine($"{item.Id}: {item.UpperName} = {item.AdjustedValue}");43 }44});4546// Feed the pipeline47foreach (var line in File.ReadLines("data.csv"))48{49 await sourceChannel.Writer.WriteAsync(new RawData(line));50}51sourceChannel.Writer.Complete();5253// Wait for completion54await Task.WhenAll(parseTask, transformTask, writeTask);
Async Stages
For stages with async operations:
csharp1public static (ChannelReader<TOut>, Task) CreateAsyncStage<TIn, TOut>(2 ChannelReader<TIn> input,3 Func<TIn, Task<TOut>> transformAsync,4 int bufferSize = 10)5{6 var channel = Channel.CreateBounded<TOut>(bufferSize);78 var task = Task.Run(async () =>9 {10 try11 {12 await foreach (var item in input.ReadAllAsync())13 {14 var result = await transformAsync(item);15 await channel.Writer.WriteAsync(result);16 }17 }18 finally19 {20 channel.Writer.Complete();21 }22 });2324 return (channel.Reader, task);25}2627// Usage with async transform28var (enrichedOutput, enrichTask) = Pipeline.CreateAsyncStage(29 parseOutput,30 async data => await EnrichFromApiAsync(data));
Fluent Pipeline Builder
csharp1public class PipelineBuilder<T>2{3 private readonly ChannelReader<T> _reader;4 private readonly List<Task> _tasks = new();56 public PipelineBuilder(ChannelReader<T> reader)7 {8 _reader = reader;9 }1011 public PipelineBuilder<TOut> Pipe<TOut>(Func<T, TOut> transform, int buffer = 10)12 {13 var channel = Channel.CreateBounded<TOut>(buffer);1415 var task = Task.Run(async () =>16 {17 try18 {19 await foreach (var item in _reader.ReadAllAsync())20 {21 await channel.Writer.WriteAsync(transform(item));22 }23 }24 finally25 {26 channel.Writer.Complete();27 }28 });2930 _tasks.Add(task);31 return new PipelineBuilder<TOut>(channel.Reader) { Tasks = _tasks };32 }3334 private List<Task> Tasks { get; init; } = new();3536 public async Task ConsumeAsync(Func<T, Task> consumer)37 {38 var consumeTask = Task.Run(async () =>39 {40 await foreach (var item in _reader.ReadAllAsync())41 {42 await consumer(item);43 }44 });4546 await Task.WhenAll(Tasks.Append(consumeTask));47 }48}4950// Fluent usage51var source = Channel.CreateUnbounded<string>();5253await new PipelineBuilder<string>(source.Reader)54 .Pipe(line => ParseLine(line))55 .Pipe(data => ValidateData(data))56 .Pipe(data => TransformData(data))57 .ConsumeAsync(async data => await SaveAsync(data));
Error Handling in Stages
csharp1async Task SafeStageAsync<TIn, TOut>(2 ChannelReader<TIn> input,3 ChannelWriter<TOut> output,4 ChannelWriter<(TIn Item, Exception Error)> errors,5 Func<TIn, TOut> transform)6{7 try8 {9 await foreach (var item in input.ReadAllAsync())10 {11 try12 {13 var result = transform(item);14 await output.WriteAsync(result);15 }16 catch (Exception ex)17 {18 await errors.WriteAsync((item, ex));19 }20 }21 }22 finally23 {24 output.Complete();25 errors.Complete();26 }27}
Pipeline with Metrics
csharp1class MeteredStage<TIn, TOut>2{3 private int _processed;4 private int _errors;56 public int Processed => _processed;7 public int Errors => _errors;89 public async Task RunAsync(10 ChannelReader<TIn> input,11 ChannelWriter<TOut> output,12 Func<TIn, TOut> transform)13 {14 try15 {16 await foreach (var item in input.ReadAllAsync())17 {18 try19 {20 var result = transform(item);21 await output.WriteAsync(result);22 Interlocked.Increment(ref _processed);23 }24 catch25 {26 Interlocked.Increment(ref _errors);27 }28 }29 }30 finally31 {32 output.Complete();33 }34 }35}
Key Takeaways
- Each stage reads from input channel, writes to output channel
- Always complete the output channel in finally block
- Use bounded channels for backpressure
- Async stages support I/O operations
- Fluent builders improve readability
- Add error channels for failed items
- Track metrics with Interlocked