15 minlesson

Pipeline Pattern Concept

Pipeline Pattern Concept

The pipeline pattern processes data through a series of stages, where each stage transforms the data and passes it to the next. It's ideal for ETL, data processing, and stream processing.

What Is a Pipeline?

1┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
2│ Read │──▶│ Parse │──▶│Transform│──▶│ Write │
3│ Stage │ │ Stage │ │ Stage │ │ Stage │
4└─────────┘ └─────────┘ └─────────┘ └─────────┘
5 │ │ │ │
6 Files Objects Modified Database

Each stage:

  • Receives input from the previous stage
  • Processes/transforms the data
  • Outputs to the next stage

Why Use Pipelines?

1. Parallel Processing

Each stage runs concurrently:

1Time ──────────────────────────────────────────▶
2
3Stage 1: [Item1] [Item2] [Item3] [Item4]
4Stage 2: [Item1] [Item2] [Item3] [Item4]
5Stage 3: [Item1] [Item2] [Item3] [Item4]
6
7Items flow through continuously

2. Memory Efficiency

Process items one at a time instead of loading all at once:

csharp
1// BAD: Load everything
2var allData = File.ReadAllLines("huge.csv"); // 10GB in memory!
3var parsed = allData.Select(Parse).ToList(); // 10GB more!
4var transformed = parsed.Select(Transform).ToList(); // 10GB more!
5
6// GOOD: Pipeline - only a few items in memory at a time
7await foreach (var item in ReadLinesAsync("huge.csv"))
8{
9 var parsed = Parse(item);
10 var transformed = Transform(parsed);
11 await WriteAsync(transformed);
12}

3. Modularity

Each stage is independent and testable:

csharp
1// Easy to test each stage in isolation
2[Test]
3public void ParseStage_ValidInput_ReturnsObject() { }
4
5[Test]
6public void TransformStage_AppliesRules_Correctly() { }

4. Flexibility

Easy to add, remove, or reorder stages:

1Original: Read → Parse → Write
2Add stage: Read → Parse → Validate → Write
3Reorder: Read → Validate → Parse → Write

Real-World Examples

ETL (Extract, Transform, Load)

1┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
2│ Extract │──▶│ Validate │──▶│Transform │──▶│ Load │
3│ from CSV │ │ Schema │ │ Format │ │ to DB │
4└──────────┘ └──────────┘ └──────────┘ └──────────┘

Image Processing

1┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
2│ Read │──▶│ Resize │──▶│ Compress │──▶│ Save │
3│ Image │ │ 800x600 │ │ JPEG │ │ to Disk │
4└──────────┘ └──────────┘ └──────────┘ └──────────┘

Log Processing

1┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
2│ Read │──▶│ Parse │──▶│ Filter │──▶│Aggregate │
3│ Logs │ │ JSON │ │ Errors │ │ Stats │
4└──────────┘ └──────────┘ └──────────┘ └──────────┘

Basic Pipeline with Channels

csharp
1// Stage 1: Read
2Channel<string> readChannel = Channel.CreateBounded<string>(10);
3var reader = Task.Run(async () =>
4{
5 foreach (var line in File.ReadLines("data.csv"))
6 {
7 await readChannel.Writer.WriteAsync(line);
8 }
9 readChannel.Writer.Complete();
10});
11
12// Stage 2: Parse
13Channel<Record> parseChannel = Channel.CreateBounded<Record>(10);
14var parser = Task.Run(async () =>
15{
16 await foreach (var line in readChannel.Reader.ReadAllAsync())
17 {
18 var record = ParseLine(line);
19 await parseChannel.Writer.WriteAsync(record);
20 }
21 parseChannel.Writer.Complete();
22});
23
24// Stage 3: Transform
25Channel<ProcessedRecord> transformChannel = Channel.CreateBounded<ProcessedRecord>(10);
26var transformer = Task.Run(async () =>
27{
28 await foreach (var record in parseChannel.Reader.ReadAllAsync())
29 {
30 var processed = Transform(record);
31 await transformChannel.Writer.WriteAsync(processed);
32 }
33 transformChannel.Writer.Complete();
34});
35
36// Stage 4: Write
37var writer = Task.Run(async () =>
38{
39 await foreach (var record in transformChannel.Reader.ReadAllAsync())
40 {
41 await SaveToDatabase(record);
42 }
43});
44
45// Wait for all stages
46await Task.WhenAll(reader, parser, transformer, writer);

Pipeline Characteristics

CharacteristicDescription
OrderedItems maintain order through stages
BufferedChannels act as buffers between stages
ConcurrentAll stages run simultaneously
BackpressureSlow stage slows entire pipeline
Bounded memoryBuffer sizes limit memory usage

Key Takeaways

  • Pipeline processes data through sequential stages
  • Each stage runs concurrently with others
  • Channels connect stages with buffering
  • Memory efficient for large datasets
  • Modular and easy to modify
  • Perfect for ETL and stream processing