15 minlesson

Producer/Consumer Pattern Overview

Producer/Consumer Pattern Overview

The producer/consumer pattern decouples data production from data processing, enabling scalable and responsive async systems.

What Is Producer/Consumer?

1┌──────────────┐ ┌─────────────┐ ┌──────────────┐
2│ Producer │────▶│ Queue │────▶│ Consumer │
3│ (generates │ │ (buffer) │ │ (processes │
4│ data) │ │ │ │ data) │
5└──────────────┘ └─────────────┘ └──────────────┘
  • Producer: Generates work items (data, messages, tasks)
  • Queue/Buffer: Temporary storage between producer and consumer
  • Consumer: Processes work items

Why Use This Pattern?

1. Decoupling

Producers and consumers work independently:

csharp
1// Producer doesn't wait for consumer
2producer.Add(item); // Returns immediately
3
4// Consumer processes at its own pace
5var item = consumer.Take(); // Gets next item when ready

2. Load Balancing

Handle varying production and consumption rates:

1Production Rate: ████████████░░░░░░░░ (bursts)
2Queue: ████████ (buffers)
3Consumption Rate: ████████████████████ (steady)

3. Scalability

Multiple producers and/or consumers:

1Producer 1 ──┐
2Producer 2 ──┼──▶ Queue ──┬──▶ Consumer 1
3Producer 3 ──┘ ├──▶ Consumer 2
4 └──▶ Consumer 3

4. Responsiveness

Keep UI/API responsive by offloading work:

csharp
1// API endpoint - returns immediately
2public async Task<IActionResult> Submit(Order order)
3{
4 await _orderQueue.EnqueueAsync(order); // Quick
5 return Accepted(); // Don't wait for processing
6}
7
8// Background consumer processes orders
9while (await _orderQueue.DequeueAsync())
10{
11 await ProcessOrderAsync(order); // Takes time
12}

Real-World Examples

Log Processing

1┌─────────────┐ ┌─────────────┐ ┌─────────────┐
2│ Application │────▶│ Log Queue │────▶│ Log Writer │
3│ Code │ │ │ │ (to disk) │
4└─────────────┘ └─────────────┘ └─────────────┘
5
6App doesn't wait for disk I/O

Message Queue Systems

1┌─────────────┐ ┌─────────────┐ ┌─────────────┐
2│ Web API │────▶│ RabbitMQ / │────▶│ Worker │
3│ Handler │ │ Kafka │ │ Service │
4└─────────────┘ └─────────────┘ └─────────────┘
5
6Request handled quickly, processing is async

Image Processing

1┌─────────────┐ ┌─────────────┐ ┌─────────────┐
2│ Upload │────▶│ Image Queue │────▶│ Processor │
3│ Handler │ │ │ │ (resize, │
4└─────────────┘ └─────────────┘ │ compress) │
5 └─────────────┘
6Upload returns fast, processing happens in background

Key Concepts

Bounded vs Unbounded Queues

Unbounded Queue:

  • No size limit
  • Producer never blocks
  • Risk: Memory exhaustion if consumer is slow
csharp
1var unbounded = new ConcurrentQueue<Item>(); // Grows forever

Bounded Queue:

  • Fixed maximum size
  • Producer blocks when full (backpressure)
  • Protects against memory issues
csharp
1var bounded = new BlockingCollection<Item>(capacity: 100);
2// Producer waits when queue has 100 items

Backpressure

When the consumer can't keep up, backpressure slows down the producer:

1Without Backpressure:
2Producer: ████████████████████
3Queue: ████████████████████████████████████ (overflows)
4Consumer: ████████
5
6With Backpressure (bounded queue):
7Producer: ████████ (waits) ████████ (waits) ████████
8Queue: ████████████████████ (at capacity)
9Consumer: ████████████████████████████████████

Completion Signaling

How the producer tells consumers there's no more data:

csharp
1// Producer
2foreach (var item in source)
3{
4 queue.Add(item);
5}
6queue.CompleteAdding(); // Signal: no more items
7
8// Consumer
9foreach (var item in queue.GetConsumingEnumerable())
10{
11 Process(item);
12}
13// Loop exits when CompleteAdding() called and queue empty

.NET Implementations

ClassAsync SupportBest For
BlockingCollection<T>LimitedSync-heavy scenarios
Channel<T>FullAsync-first scenarios
ConcurrentQueue<T>No (manual)Building custom solutions

Both BlockingCollection and Channel are covered in detail in the following lessons.

Simple Example

csharp
1var queue = Channel.CreateBounded<int>(10);
2
3// Producer
4var producer = Task.Run(async () =>
5{
6 for (int i = 0; i < 100; i++)
7 {
8 await queue.Writer.WriteAsync(i);
9 Console.WriteLine($"Produced: {i}");
10 }
11 queue.Writer.Complete();
12});
13
14// Consumer
15var consumer = Task.Run(async () =>
16{
17 await foreach (var item in queue.Reader.ReadAllAsync())
18 {
19 await Task.Delay(50); // Simulate processing
20 Console.WriteLine($"Consumed: {item}");
21 }
22});
23
24await Task.WhenAll(producer, consumer);

Key Takeaways

  • Producer/consumer decouples data generation from processing
  • Enables load balancing, scalability, and responsiveness
  • Bounded queues provide backpressure to prevent memory issues
  • Completion signaling tells consumers when to stop
  • .NET provides BlockingCollection and Channel<T> implementations
  • This pattern is fundamental to async architectures