Fan-out/Fan-in Pattern
The fan-out/fan-in pattern distributes work across multiple parallel workers (fan-out) and then aggregates their results (fan-in). It's essential for maximizing throughput on parallelizable workloads.
What Is Fan-out/Fan-in?
1 ┌─────────┐2 ┌──▶│Worker 1 │──┐3 │ └─────────┘ │4┌─────────┐ │ ┌─────────┐ │ ┌─────────┐5│ Input │─────┼──▶│Worker 2 │──┼────▶│ Results │6│ Queue │ │ └─────────┘ │ │Collector│7└─────────┘ │ ┌─────────┐ │ └─────────┘8 └──▶│Worker 3 │──┘9 └─────────┘1011 Fan-out Work Fan-in12 (distribute) (parallel) (aggregate)
When to Use
Fan-out/fan-in is ideal when:
- Work items are independent (no dependencies)
- Processing is CPU or I/O bound
- You have multiple cores or async I/O capacity
- Order of results doesn't matter (or can be reordered)
Simple Example
csharp1async Task<int[]> ProcessAllAsync(int[] items, int workerCount)2{3 var results = new ConcurrentBag<int>();45 // Fan-out: distribute work6 var chunks = items.Chunk(items.Length / workerCount);78 var workers = chunks.Select(chunk => Task.Run(async () =>9 {10 foreach (var item in chunk)11 {12 var result = await ProcessItemAsync(item);13 results.Add(result); // Fan-in: collect results14 }15 }));1617 await Task.WhenAll(workers);1819 return results.ToArray();20}
Fan-out Strategies
1. Chunk-Based Distribution
csharp1// Divide data into equal chunks2var chunks = data.Chunk(data.Count / workerCount);34var tasks = chunks.Select(chunk => ProcessChunkAsync(chunk));5await Task.WhenAll(tasks);
2. Work-Stealing (Dynamic)
csharp1// Workers pull from shared queue2var workQueue = new ConcurrentQueue<Item>(items);34var workers = Enumerable.Range(0, workerCount)5 .Select(_ => Task.Run(async () =>6 {7 while (workQueue.TryDequeue(out var item))8 {9 await ProcessAsync(item);10 }11 }));1213await Task.WhenAll(workers);
3. Channel-Based Distribution
csharp1var inputChannel = Channel.CreateUnbounded<Item>();23// Workers read from channel4var workers = Enumerable.Range(0, workerCount)5 .Select(_ => ProcessChannelAsync(inputChannel.Reader));67// Feed items8foreach (var item in items)9 await inputChannel.Writer.WriteAsync(item);10inputChannel.Writer.Complete();1112await Task.WhenAll(workers);
Fan-in Strategies
1. ConcurrentBag
csharp1var results = new ConcurrentBag<Result>();23// Workers add results4results.Add(result);56// Collect at end7var allResults = results.ToArray();
2. Channel
csharp1var resultsChannel = Channel.CreateUnbounded<Result>();23// Workers write results4await resultsChannel.Writer.WriteAsync(result);56// Single consumer collects7await foreach (var result in resultsChannel.Reader.ReadAllAsync())8{9 allResults.Add(result);10}
3. Task.WhenAll with Results
csharp1var tasks = items.Select(item => ProcessAsync(item));2var results = await Task.WhenAll(tasks);3// results is T[] with all results
Complete Example
csharp1class ImageProcessor2{3 public async Task<ProcessedImage[]> ProcessImagesAsync(4 string[] imagePaths,5 int workerCount = 4)6 {7 var inputChannel = Channel.CreateBounded<string>(100);8 var outputChannel = Channel.CreateUnbounded<ProcessedImage>();910 // Start workers (fan-out)11 var workers = Enumerable.Range(0, workerCount)12 .Select(id => ProcessWorkerAsync(id, inputChannel.Reader, outputChannel.Writer))13 .ToArray();1415 // Feed input16 var feeder = Task.Run(async () =>17 {18 foreach (var path in imagePaths)19 await inputChannel.Writer.WriteAsync(path);20 inputChannel.Writer.Complete();21 });2223 // Wait for all workers to complete24 await Task.WhenAll(workers);25 outputChannel.Writer.Complete();2627 // Collect results (fan-in)28 var results = new List<ProcessedImage>();29 await foreach (var result in outputChannel.Reader.ReadAllAsync())30 {31 results.Add(result);32 }3334 return results.ToArray();35 }3637 private async Task ProcessWorkerAsync(38 int workerId,39 ChannelReader<string> input,40 ChannelWriter<ProcessedImage> output)41 {42 await foreach (var path in input.ReadAllAsync())43 {44 var image = await LoadImageAsync(path);45 var processed = await ApplyFiltersAsync(image);46 await output.WriteAsync(processed);47 Console.WriteLine($"Worker {workerId} processed {path}");48 }49 }50}
Comparison with Other Patterns
| Pattern | Use Case |
|---|---|
| Pipeline | Sequential stages, ordered |
| Fan-out/Fan-in | Parallel work, unordered |
| Producer/Consumer | Decoupling, buffering |
| Parallel.ForEach | Simple parallel iteration |
Key Takeaways
- Fan-out distributes work across multiple workers
- Fan-in aggregates results from workers
- Use for independent, parallelizable work
- Channel or ConcurrentQueue for work distribution
- ConcurrentBag or Channel for result collection
- Worker count typically matches available cores
- Dynamic work-stealing handles uneven workloads