BlockingCollection
BlockingCollection<T> is a thread-safe collection designed for producer/consumer scenarios. It wraps another collection (defaulting to ConcurrentQueue<T>) and adds blocking and bounding capabilities.
Basic Usage
csharp1using System.Collections.Concurrent;23// Create a blocking collection (unbounded by default)4var collection = new BlockingCollection<string>();56// Producer7collection.Add("item1");8collection.Add("item2");9collection.CompleteAdding(); // Signal no more items1011// Consumer12foreach (var item in collection.GetConsumingEnumerable())13{14 Console.WriteLine(item);15}16// Loop exits after CompleteAdding() and queue is empty
Bounded Collection
Limit the size to create backpressure:
csharp1// Max 10 items - Add() blocks when full2var bounded = new BlockingCollection<int>(boundedCapacity: 10);34// Producer will block if 10 items in queue5bounded.Add(item); // Blocks until space available67// Alternative: Try to add with timeout8if (bounded.TryAdd(item, TimeSpan.FromSeconds(5)))9{10 Console.WriteLine("Added");11}12else13{14 Console.WriteLine("Timeout - queue full");15}
Key Methods
Adding Items
csharp1// Add - blocks if bounded and full2collection.Add(item);34// TryAdd - returns false if can't add5bool added = collection.TryAdd(item);67// TryAdd with timeout8bool added = collection.TryAdd(item, TimeSpan.FromSeconds(1));910// TryAdd with cancellation11bool added = collection.TryAdd(item, -1, cancellationToken);
Taking Items
csharp1// Take - blocks if empty2var item = collection.Take();34// TryTake - returns false if empty5if (collection.TryTake(out var item))6{7 Process(item);8}910// TryTake with timeout11if (collection.TryTake(out var item, TimeSpan.FromSeconds(1)))12{13 Process(item);14}
Completing
csharp1// Signal that no more items will be added2collection.CompleteAdding();34// Check if adding is complete5bool isComplete = collection.IsAddingCompleted;67// Check if completely done (complete + empty)8bool isDone = collection.IsCompleted;
GetConsumingEnumerable
The idiomatic way to consume items:
csharp1var collection = new BlockingCollection<WorkItem>();23// Consumer loop4foreach (var item in collection.GetConsumingEnumerable())5{6 ProcessItem(item);7 // Loop automatically exits when:8 // 1. CompleteAdding() is called AND9 // 2. Collection becomes empty10}
With cancellation:
csharp1var cts = new CancellationTokenSource();23try4{5 foreach (var item in collection.GetConsumingEnumerable(cts.Token))6 {7 ProcessItem(item);8 }9}10catch (OperationCanceledException)11{12 Console.WriteLine("Consumer canceled");13}
Multiple Producers
csharp1var collection = new BlockingCollection<int>(100);23// Multiple producers4var producers = Enumerable.Range(0, 3).Select(producerId =>5 Task.Run(() =>6 {7 for (int i = 0; i < 100; i++)8 {9 collection.Add(producerId * 1000 + i);10 }11 })12).ToArray();1314// Wait for all producers, then complete15Task.WhenAll(producers).ContinueWith(_ => collection.CompleteAdding());1617// Single consumer18foreach (var item in collection.GetConsumingEnumerable())19{20 Console.WriteLine(item);21}
Multiple Consumers
csharp1var collection = new BlockingCollection<int>(100);23// Producer4Task.Run(() =>5{6 for (int i = 0; i < 1000; i++)7 {8 collection.Add(i);9 }10 collection.CompleteAdding();11});1213// Multiple consumers14var consumers = Enumerable.Range(0, 3).Select(consumerId =>15 Task.Run(() =>16 {17 foreach (var item in collection.GetConsumingEnumerable())18 {19 Console.WriteLine($"Consumer {consumerId}: {item}");20 }21 })22).ToArray();2324await Task.WhenAll(consumers);
Complete Example: Log Processor
csharp1class LogProcessor : IDisposable2{3 private readonly BlockingCollection<string> _logQueue;4 private readonly Task _writerTask;5 private readonly StreamWriter _writer;67 public LogProcessor(string filePath)8 {9 _logQueue = new BlockingCollection<string>(1000);10 _writer = new StreamWriter(filePath, append: true);1112 // Start background consumer13 _writerTask = Task.Run(ProcessLogs);14 }1516 public void Log(string message)17 {18 var entry = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} {message}";1920 // Non-blocking if queue has space21 if (!_logQueue.TryAdd(entry))22 {23 // Queue full - could drop, wait, or throw24 Console.WriteLine("Log queue full, message dropped");25 }26 }2728 private void ProcessLogs()29 {30 foreach (var entry in _logQueue.GetConsumingEnumerable())31 {32 _writer.WriteLine(entry);33 }34 }3536 public void Dispose()37 {38 _logQueue.CompleteAdding();39 _writerTask.Wait(); // Wait for queue to drain40 _writer.Dispose();41 _logQueue.Dispose();42 }43}4445// Usage46using var logger = new LogProcessor("app.log");47logger.Log("Application started");48logger.Log("Processing request...");49// Logs are written asynchronously
Choosing the Underlying Collection
BlockingCollection can wrap different concurrent collections:
csharp1// Default: ConcurrentQueue (FIFO)2var fifo = new BlockingCollection<int>();34// Stack (LIFO)5var lifo = new BlockingCollection<int>(new ConcurrentStack<int>());67// Bag (unordered, optimized for same-thread add/take)8var bag = new BlockingCollection<int>(new ConcurrentBag<int>());
Limitations
Add()andTake()are blocking, not async-friendly- For async scenarios, prefer
Channel<T>(next lesson) - No built-in priority queue support
Key Takeaways
BlockingCollection<T>is designed for producer/consumer- Use bounded capacity for backpressure
GetConsumingEnumerable()is the idiomatic consumer patternCompleteAdding()signals no more items- Supports multiple producers and consumers
- For async code, consider
Channel<T>instead - Always dispose when done