Producer/Consumer Pattern Overview
The producer/consumer pattern decouples data production from data processing, enabling scalable and responsive async systems.
What Is Producer/Consumer?
1┌──────────────┐ ┌─────────────┐ ┌──────────────┐2│ Producer │────▶│ Queue │────▶│ Consumer │3│ (generates │ │ (buffer) │ │ (processes │4│ data) │ │ │ │ data) │5└──────────────┘ └─────────────┘ └──────────────┘
- Producer: Generates work items (data, messages, tasks)
- Queue/Buffer: Temporary storage between producer and consumer
- Consumer: Processes work items
Why Use This Pattern?
1. Decoupling
Producers and consumers work independently:
csharp1// Producer doesn't wait for consumer2producer.Add(item); // Returns immediately34// Consumer processes at its own pace5var item = consumer.Take(); // Gets next item when ready
2. Load Balancing
Handle varying production and consumption rates:
1Production Rate: ████████████░░░░░░░░ (bursts)2Queue: ████████ (buffers)3Consumption Rate: ████████████████████ (steady)
3. Scalability
Multiple producers and/or consumers:
1Producer 1 ──┐2Producer 2 ──┼──▶ Queue ──┬──▶ Consumer 13Producer 3 ──┘ ├──▶ Consumer 24 └──▶ Consumer 3
4. Responsiveness
Keep UI/API responsive by offloading work:
csharp1// API endpoint - returns immediately2public async Task<IActionResult> Submit(Order order)3{4 await _orderQueue.EnqueueAsync(order); // Quick5 return Accepted(); // Don't wait for processing6}78// Background consumer processes orders9while (await _orderQueue.DequeueAsync())10{11 await ProcessOrderAsync(order); // Takes time12}
Real-World Examples
Log Processing
1┌─────────────┐ ┌─────────────┐ ┌─────────────┐2│ Application │────▶│ Log Queue │────▶│ Log Writer │3│ Code │ │ │ │ (to disk) │4└─────────────┘ └─────────────┘ └─────────────┘56App doesn't wait for disk I/O
Message Queue Systems
1┌─────────────┐ ┌─────────────┐ ┌─────────────┐2│ Web API │────▶│ RabbitMQ / │────▶│ Worker │3│ Handler │ │ Kafka │ │ Service │4└─────────────┘ └─────────────┘ └─────────────┘56Request handled quickly, processing is async
Image Processing
1┌─────────────┐ ┌─────────────┐ ┌─────────────┐2│ Upload │────▶│ Image Queue │────▶│ Processor │3│ Handler │ │ │ │ (resize, │4└─────────────┘ └─────────────┘ │ compress) │5 └─────────────┘6Upload returns fast, processing happens in background
Key Concepts
Bounded vs Unbounded Queues
Unbounded Queue:
- No size limit
- Producer never blocks
- Risk: Memory exhaustion if consumer is slow
csharp1var unbounded = new ConcurrentQueue<Item>(); // Grows forever
Bounded Queue:
- Fixed maximum size
- Producer blocks when full (backpressure)
- Protects against memory issues
csharp1var bounded = new BlockingCollection<Item>(capacity: 100);2// Producer waits when queue has 100 items
Backpressure
When the consumer can't keep up, backpressure slows down the producer:
1Without Backpressure:2Producer: ████████████████████3Queue: ████████████████████████████████████ (overflows)4Consumer: ████████56With Backpressure (bounded queue):7Producer: ████████ (waits) ████████ (waits) ████████8Queue: ████████████████████ (at capacity)9Consumer: ████████████████████████████████████
Completion Signaling
How the producer tells consumers there's no more data:
csharp1// Producer2foreach (var item in source)3{4 queue.Add(item);5}6queue.CompleteAdding(); // Signal: no more items78// Consumer9foreach (var item in queue.GetConsumingEnumerable())10{11 Process(item);12}13// Loop exits when CompleteAdding() called and queue empty
.NET Implementations
| Class | Async Support | Best For |
|---|---|---|
BlockingCollection<T> | Limited | Sync-heavy scenarios |
Channel<T> | Full | Async-first scenarios |
ConcurrentQueue<T> | No (manual) | Building custom solutions |
Both BlockingCollection and Channel are covered in detail in the following lessons.
Simple Example
csharp1var queue = Channel.CreateBounded<int>(10);23// Producer4var producer = Task.Run(async () =>5{6 for (int i = 0; i < 100; i++)7 {8 await queue.Writer.WriteAsync(i);9 Console.WriteLine($"Produced: {i}");10 }11 queue.Writer.Complete();12});1314// Consumer15var consumer = Task.Run(async () =>16{17 await foreach (var item in queue.Reader.ReadAllAsync())18 {19 await Task.Delay(50); // Simulate processing20 Console.WriteLine($"Consumed: {item}");21 }22});2324await Task.WhenAll(producer, consumer);
Key Takeaways
- Producer/consumer decouples data generation from processing
- Enables load balancing, scalability, and responsiveness
- Bounded queues provide backpressure to prevent memory issues
- Completion signaling tells consumers when to stop
- .NET provides
BlockingCollectionandChannel<T>implementations - This pattern is fundamental to async architectures