15 minlesson

BlockingCollection<T>

BlockingCollection

BlockingCollection<T> is a thread-safe collection designed for producer/consumer scenarios. It wraps another collection (defaulting to ConcurrentQueue<T>) and adds blocking and bounding capabilities.

Basic Usage

csharp
1using System.Collections.Concurrent;
2
3// Create a blocking collection (unbounded by default)
4var collection = new BlockingCollection<string>();
5
6// Producer
7collection.Add("item1");
8collection.Add("item2");
9collection.CompleteAdding(); // Signal no more items
10
11// Consumer
12foreach (var item in collection.GetConsumingEnumerable())
13{
14 Console.WriteLine(item);
15}
16// Loop exits after CompleteAdding() and queue is empty

Bounded Collection

Limit the size to create backpressure:

csharp
1// Max 10 items - Add() blocks when full
2var bounded = new BlockingCollection<int>(boundedCapacity: 10);
3
4// Producer will block if 10 items in queue
5bounded.Add(item); // Blocks until space available
6
7// Alternative: Try to add with timeout
8if (bounded.TryAdd(item, TimeSpan.FromSeconds(5)))
9{
10 Console.WriteLine("Added");
11}
12else
13{
14 Console.WriteLine("Timeout - queue full");
15}

Key Methods

Adding Items

csharp
1// Add - blocks if bounded and full
2collection.Add(item);
3
4// TryAdd - returns false if can't add
5bool added = collection.TryAdd(item);
6
7// TryAdd with timeout
8bool added = collection.TryAdd(item, TimeSpan.FromSeconds(1));
9
10// TryAdd with cancellation
11bool added = collection.TryAdd(item, -1, cancellationToken);

Taking Items

csharp
1// Take - blocks if empty
2var item = collection.Take();
3
4// TryTake - returns false if empty
5if (collection.TryTake(out var item))
6{
7 Process(item);
8}
9
10// TryTake with timeout
11if (collection.TryTake(out var item, TimeSpan.FromSeconds(1)))
12{
13 Process(item);
14}

Completing

csharp
1// Signal that no more items will be added
2collection.CompleteAdding();
3
4// Check if adding is complete
5bool isComplete = collection.IsAddingCompleted;
6
7// Check if completely done (complete + empty)
8bool isDone = collection.IsCompleted;

GetConsumingEnumerable

The idiomatic way to consume items:

csharp
1var collection = new BlockingCollection<WorkItem>();
2
3// Consumer loop
4foreach (var item in collection.GetConsumingEnumerable())
5{
6 ProcessItem(item);
7 // Loop automatically exits when:
8 // 1. CompleteAdding() is called AND
9 // 2. Collection becomes empty
10}

With cancellation:

csharp
1var cts = new CancellationTokenSource();
2
3try
4{
5 foreach (var item in collection.GetConsumingEnumerable(cts.Token))
6 {
7 ProcessItem(item);
8 }
9}
10catch (OperationCanceledException)
11{
12 Console.WriteLine("Consumer canceled");
13}

Multiple Producers

csharp
1var collection = new BlockingCollection<int>(100);
2
3// Multiple producers
4var producers = Enumerable.Range(0, 3).Select(producerId =>
5 Task.Run(() =>
6 {
7 for (int i = 0; i < 100; i++)
8 {
9 collection.Add(producerId * 1000 + i);
10 }
11 })
12).ToArray();
13
14// Wait for all producers, then complete
15Task.WhenAll(producers).ContinueWith(_ => collection.CompleteAdding());
16
17// Single consumer
18foreach (var item in collection.GetConsumingEnumerable())
19{
20 Console.WriteLine(item);
21}

Multiple Consumers

csharp
1var collection = new BlockingCollection<int>(100);
2
3// Producer
4Task.Run(() =>
5{
6 for (int i = 0; i < 1000; i++)
7 {
8 collection.Add(i);
9 }
10 collection.CompleteAdding();
11});
12
13// Multiple consumers
14var consumers = Enumerable.Range(0, 3).Select(consumerId =>
15 Task.Run(() =>
16 {
17 foreach (var item in collection.GetConsumingEnumerable())
18 {
19 Console.WriteLine($"Consumer {consumerId}: {item}");
20 }
21 })
22).ToArray();
23
24await Task.WhenAll(consumers);

Complete Example: Log Processor

csharp
1class LogProcessor : IDisposable
2{
3 private readonly BlockingCollection<string> _logQueue;
4 private readonly Task _writerTask;
5 private readonly StreamWriter _writer;
6
7 public LogProcessor(string filePath)
8 {
9 _logQueue = new BlockingCollection<string>(1000);
10 _writer = new StreamWriter(filePath, append: true);
11
12 // Start background consumer
13 _writerTask = Task.Run(ProcessLogs);
14 }
15
16 public void Log(string message)
17 {
18 var entry = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {message}";
19
20 // Non-blocking if queue has space
21 if (!_logQueue.TryAdd(entry))
22 {
23 // Queue full - could drop, wait, or throw
24 Console.WriteLine("Log queue full, message dropped");
25 }
26 }
27
28 private void ProcessLogs()
29 {
30 foreach (var entry in _logQueue.GetConsumingEnumerable())
31 {
32 _writer.WriteLine(entry);
33 }
34 }
35
36 public void Dispose()
37 {
38 _logQueue.CompleteAdding();
39 _writerTask.Wait(); // Wait for queue to drain
40 _writer.Dispose();
41 _logQueue.Dispose();
42 }
43}
44
45// Usage
46using var logger = new LogProcessor("app.log");
47logger.Log("Application started");
48logger.Log("Processing request...");
49// Logs are written asynchronously

Choosing the Underlying Collection

BlockingCollection can wrap different concurrent collections:

csharp
1// Default: ConcurrentQueue (FIFO)
2var fifo = new BlockingCollection<int>();
3
4// Stack (LIFO)
5var lifo = new BlockingCollection<int>(new ConcurrentStack<int>());
6
7// Bag (unordered, optimized for same-thread add/take)
8var bag = new BlockingCollection<int>(new ConcurrentBag<int>());

Limitations

  • Add() and Take() are blocking, not async-friendly
  • For async scenarios, prefer Channel<T> (next lesson)
  • No built-in priority queue support

Key Takeaways

  • BlockingCollection<T> is designed for producer/consumer
  • Use bounded capacity for backpressure
  • GetConsumingEnumerable() is the idiomatic consumer pattern
  • CompleteAdding() signals no more items
  • Supports multiple producers and consumers
  • For async code, consider Channel<T> instead
  • Always dispose when done
BlockingCollection<T> - Anko Academy