Pipeline Pattern Concept
The pipeline pattern processes data through a series of stages, where each stage transforms the data and passes it to the next. It's ideal for ETL, data processing, and stream processing.
What Is a Pipeline?
1┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐2│ Read │──▶│ Parse │──▶│Transform│──▶│ Write │3│ Stage │ │ Stage │ │ Stage │ │ Stage │4└─────────┘ └─────────┘ └─────────┘ └─────────┘5 │ │ │ │6 Files Objects Modified Database
Each stage:
- Receives input from the previous stage
- Processes/transforms the data
- Outputs to the next stage
Why Use Pipelines?
1. Parallel Processing
Each stage runs concurrently:
1Time ──────────────────────────────────────────▶23Stage 1: [Item1] [Item2] [Item3] [Item4]4Stage 2: [Item1] [Item2] [Item3] [Item4]5Stage 3: [Item1] [Item2] [Item3] [Item4]67Items flow through continuously
2. Memory Efficiency
Process items one at a time instead of loading all at once:
csharp1// BAD: Load everything2var allData = File.ReadAllLines("huge.csv"); // 10GB in memory!3var parsed = allData.Select(Parse).ToList(); // 10GB more!4var transformed = parsed.Select(Transform).ToList(); // 10GB more!56// GOOD: Pipeline - only a few items in memory at a time7await foreach (var item in ReadLinesAsync("huge.csv"))8{9 var parsed = Parse(item);10 var transformed = Transform(parsed);11 await WriteAsync(transformed);12}
3. Modularity
Each stage is independent and testable:
csharp1// Easy to test each stage in isolation2[Test]3public void ParseStage_ValidInput_ReturnsObject() { }45[Test]6public void TransformStage_AppliesRules_Correctly() { }
4. Flexibility
Easy to add, remove, or reorder stages:
1Original: Read → Parse → Write2Add stage: Read → Parse → Validate → Write3Reorder: Read → Validate → Parse → Write
Real-World Examples
ETL (Extract, Transform, Load)
1┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐2│ Extract │──▶│ Validate │──▶│Transform │──▶│ Load │3│ from CSV │ │ Schema │ │ Format │ │ to DB │4└──────────┘ └──────────┘ └──────────┘ └──────────┘
Image Processing
1┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐2│ Read │──▶│ Resize │──▶│ Compress │──▶│ Save │3│ Image │ │ 800x600 │ │ JPEG │ │ to Disk │4└──────────┘ └──────────┘ └──────────┘ └──────────┘
Log Processing
1┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐2│ Read │──▶│ Parse │──▶│ Filter │──▶│Aggregate │3│ Logs │ │ JSON │ │ Errors │ │ Stats │4└──────────┘ └──────────┘ └──────────┘ └──────────┘
Basic Pipeline with Channels
csharp1// Stage 1: Read2Channel<string> readChannel = Channel.CreateBounded<string>(10);3var reader = Task.Run(async () =>4{5 foreach (var line in File.ReadLines("data.csv"))6 {7 await readChannel.Writer.WriteAsync(line);8 }9 readChannel.Writer.Complete();10});1112// Stage 2: Parse13Channel<Record> parseChannel = Channel.CreateBounded<Record>(10);14var parser = Task.Run(async () =>15{16 await foreach (var line in readChannel.Reader.ReadAllAsync())17 {18 var record = ParseLine(line);19 await parseChannel.Writer.WriteAsync(record);20 }21 parseChannel.Writer.Complete();22});2324// Stage 3: Transform25Channel<ProcessedRecord> transformChannel = Channel.CreateBounded<ProcessedRecord>(10);26var transformer = Task.Run(async () =>27{28 await foreach (var record in parseChannel.Reader.ReadAllAsync())29 {30 var processed = Transform(record);31 await transformChannel.Writer.WriteAsync(processed);32 }33 transformChannel.Writer.Complete();34});3536// Stage 4: Write37var writer = Task.Run(async () =>38{39 await foreach (var record in transformChannel.Reader.ReadAllAsync())40 {41 await SaveToDatabase(record);42 }43});4445// Wait for all stages46await Task.WhenAll(reader, parser, transformer, writer);
Pipeline Characteristics
| Characteristic | Description |
|---|---|
| Ordered | Items maintain order through stages |
| Buffered | Channels act as buffers between stages |
| Concurrent | All stages run simultaneously |
| Backpressure | Slow stage slows entire pipeline |
| Bounded memory | Buffer sizes limit memory usage |
Key Takeaways
- Pipeline processes data through sequential stages
- Each stage runs concurrently with others
- Channels connect stages with buffering
- Memory efficient for large datasets
- Modular and easy to modify
- Perfect for ETL and stream processing