15 minlesson

Fan-out/Fan-in Pattern

Fan-out/Fan-in Pattern

The fan-out/fan-in pattern distributes work across multiple parallel workers (fan-out) and then aggregates their results (fan-in). It's essential for maximizing throughput on parallelizable workloads.

What Is Fan-out/Fan-in?

1 ┌─────────┐
2 ┌──▶│Worker 1 │──┐
3 │ └─────────┘ │
4┌─────────┐ │ ┌─────────┐ │ ┌─────────┐
5│ Input │─────┼──▶│Worker 2 │──┼────▶│ Results │
6│ Queue │ │ └─────────┘ │ │Collector│
7└─────────┘ │ ┌─────────┐ │ └─────────┘
8 └──▶│Worker 3 │──┘
9 └─────────┘
10
11 Fan-out Work Fan-in
12 (distribute) (parallel) (aggregate)

When to Use

Fan-out/fan-in is ideal when:

  • Work items are independent (no dependencies)
  • Processing is CPU or I/O bound
  • You have multiple cores or async I/O capacity
  • Order of results doesn't matter (or can be reordered)

Simple Example

csharp
1async Task<int[]> ProcessAllAsync(int[] items, int workerCount)
2{
3 var results = new ConcurrentBag<int>();
4
5 // Fan-out: distribute work
6 var chunks = items.Chunk(items.Length / workerCount);
7
8 var workers = chunks.Select(chunk => Task.Run(async () =>
9 {
10 foreach (var item in chunk)
11 {
12 var result = await ProcessItemAsync(item);
13 results.Add(result); // Fan-in: collect results
14 }
15 }));
16
17 await Task.WhenAll(workers);
18
19 return results.ToArray();
20}

Fan-out Strategies

1. Chunk-Based Distribution

csharp
1// Divide data into equal chunks
2var chunks = data.Chunk(data.Count / workerCount);
3
4var tasks = chunks.Select(chunk => ProcessChunkAsync(chunk));
5await Task.WhenAll(tasks);

2. Work-Stealing (Dynamic)

csharp
1// Workers pull from shared queue
2var workQueue = new ConcurrentQueue<Item>(items);
3
4var workers = Enumerable.Range(0, workerCount)
5 .Select(_ => Task.Run(async () =>
6 {
7 while (workQueue.TryDequeue(out var item))
8 {
9 await ProcessAsync(item);
10 }
11 }));
12
13await Task.WhenAll(workers);

3. Channel-Based Distribution

csharp
1var inputChannel = Channel.CreateUnbounded<Item>();
2
3// Workers read from channel
4var workers = Enumerable.Range(0, workerCount)
5 .Select(_ => ProcessChannelAsync(inputChannel.Reader));
6
7// Feed items
8foreach (var item in items)
9 await inputChannel.Writer.WriteAsync(item);
10inputChannel.Writer.Complete();
11
12await Task.WhenAll(workers);

Fan-in Strategies

1. ConcurrentBag

csharp
1var results = new ConcurrentBag<Result>();
2
3// Workers add results
4results.Add(result);
5
6// Collect at end
7var allResults = results.ToArray();

2. Channel

csharp
1var resultsChannel = Channel.CreateUnbounded<Result>();
2
3// Workers write results
4await resultsChannel.Writer.WriteAsync(result);
5
6// Single consumer collects
7await foreach (var result in resultsChannel.Reader.ReadAllAsync())
8{
9 allResults.Add(result);
10}

3. Task.WhenAll with Results

csharp
1var tasks = items.Select(item => ProcessAsync(item));
2var results = await Task.WhenAll(tasks);
3// results is T[] with all results

Complete Example

csharp
1class ImageProcessor
2{
3 public async Task<ProcessedImage[]> ProcessImagesAsync(
4 string[] imagePaths,
5 int workerCount = 4)
6 {
7 var inputChannel = Channel.CreateBounded<string>(100);
8 var outputChannel = Channel.CreateUnbounded<ProcessedImage>();
9
10 // Start workers (fan-out)
11 var workers = Enumerable.Range(0, workerCount)
12 .Select(id => ProcessWorkerAsync(id, inputChannel.Reader, outputChannel.Writer))
13 .ToArray();
14
15 // Feed input
16 var feeder = Task.Run(async () =>
17 {
18 foreach (var path in imagePaths)
19 await inputChannel.Writer.WriteAsync(path);
20 inputChannel.Writer.Complete();
21 });
22
23 // Wait for all workers to complete
24 await Task.WhenAll(workers);
25 outputChannel.Writer.Complete();
26
27 // Collect results (fan-in)
28 var results = new List<ProcessedImage>();
29 await foreach (var result in outputChannel.Reader.ReadAllAsync())
30 {
31 results.Add(result);
32 }
33
34 return results.ToArray();
35 }
36
37 private async Task ProcessWorkerAsync(
38 int workerId,
39 ChannelReader<string> input,
40 ChannelWriter<ProcessedImage> output)
41 {
42 await foreach (var path in input.ReadAllAsync())
43 {
44 var image = await LoadImageAsync(path);
45 var processed = await ApplyFiltersAsync(image);
46 await output.WriteAsync(processed);
47 Console.WriteLine($"Worker {workerId} processed {path}");
48 }
49 }
50}

Comparison with Other Patterns

PatternUse Case
PipelineSequential stages, ordered
Fan-out/Fan-inParallel work, unordered
Producer/ConsumerDecoupling, buffering
Parallel.ForEachSimple parallel iteration

Key Takeaways

  • Fan-out distributes work across multiple workers
  • Fan-in aggregates results from workers
  • Use for independent, parallelizable work
  • Channel or ConcurrentQueue for work distribution
  • ConcurrentBag or Channel for result collection
  • Worker count typically matches available cores
  • Dynamic work-stealing handles uneven workloads