Dynamic Parallelism
Dynamic parallelism adjusts the number of workers based on workload, system resources, or runtime conditions. This enables optimal resource utilization.
Why Dynamic Parallelism?
Static worker count has problems:
- Too few workers → underutilized resources
- Too many workers → resource contention, overhead
- Workload varies → fixed count is suboptimal
Dynamic parallelism adapts:
1Workload: ████████░░░░████████████░░░░░░░░░████2Workers: 4 2 8 2 6
Basic Dynamic Worker Pool
csharp1class DynamicWorkerPool<T>2{3 private readonly Channel<T> _workQueue;4 private readonly Func<T, Task> _processor;5 private readonly List<Task> _workers = new();6 private readonly int _minWorkers;7 private readonly int _maxWorkers;89 public DynamicWorkerPool(10 Func<T, Task> processor,11 int minWorkers = 1,12 int maxWorkers = 10)13 {14 _processor = processor;15 _minWorkers = minWorkers;16 _maxWorkers = maxWorkers;17 _workQueue = Channel.CreateBounded<T>(100);1819 // Start minimum workers20 for (int i = 0; i < minWorkers; i++)21 AddWorker();22 }2324 public int WorkerCount => _workers.Count(t => !t.IsCompleted);25 public int QueueDepth => _workQueue.Reader.Count;2627 public async ValueTask EnqueueAsync(T item)28 {29 // Scale up if queue is backing up30 if (QueueDepth > 10 && WorkerCount < _maxWorkers)31 {32 AddWorker();33 }3435 await _workQueue.Writer.WriteAsync(item);36 }3738 private void AddWorker()39 {40 var worker = Task.Run(async () =>41 {42 await foreach (var item in _workQueue.Reader.ReadAllAsync())43 {44 await _processor(item);45 }46 });47 _workers.Add(worker);48 }49}
Scaling Based on Queue Depth
csharp1class AdaptiveProcessor<T>2{3 private readonly Channel<T> _channel;4 private int _workerCount = 0;5 private readonly int _maxWorkers;6 private readonly Func<T, Task> _handler;78 public AdaptiveProcessor(Func<T, Task> handler, int maxWorkers = 10)9 {10 _handler = handler;11 _maxWorkers = maxWorkers;12 _channel = Channel.CreateBounded<T>(1000);1314 // Monitor and scale15 _ = MonitorAndScaleAsync();16 }1718 private async Task MonitorAndScaleAsync()19 {20 while (true)21 {22 await Task.Delay(100); // Check every 100ms2324 int queueDepth = _channel.Reader.Count;2526 // Scale up27 if (queueDepth > 50 && _workerCount < _maxWorkers)28 {29 StartWorker();30 }31 // Scale down (let workers exit naturally when queue empties)32 }33 }3435 private void StartWorker()36 {37 Interlocked.Increment(ref _workerCount);3839 _ = Task.Run(async () =>40 {41 try42 {43 await foreach (var item in _channel.Reader.ReadAllAsync())44 {45 await _handler(item);4647 // Exit if queue is nearly empty and we have extra workers48 if (_channel.Reader.Count < 5 && _workerCount > 1)49 {50 break;51 }52 }53 }54 finally55 {56 Interlocked.Decrement(ref _workerCount);57 }58 });59 }60}
Scaling Based on Response Time
csharp1class ResponseTimeScaler2{3 private readonly double _targetResponseTimeMs = 100;4 private double _avgResponseTime = 0;5 private int _currentParallelism = 1;67 public async Task ProcessAsync<T>(8 IEnumerable<T> items,9 Func<T, Task> processor)10 {11 var queue = new ConcurrentQueue<T>(items);12 var responseTimes = new ConcurrentBag<double>();1314 while (!queue.IsEmpty || responseTimes.Any())15 {16 // Adjust parallelism based on response time17 if (_avgResponseTime > _targetResponseTimeMs * 1.5)18 {19 _currentParallelism = Math.Max(1, _currentParallelism - 1);20 }21 else if (_avgResponseTime < _targetResponseTimeMs * 0.5)22 {23 _currentParallelism = Math.Min(20, _currentParallelism + 1);24 }2526 // Process batch27 var batch = new List<Task>();28 for (int i = 0; i < _currentParallelism && queue.TryDequeue(out var item); i++)29 {30 batch.Add(Task.Run(async () =>31 {32 var sw = Stopwatch.StartNew();33 await processor(item);34 responseTimes.Add(sw.ElapsedMilliseconds);35 }));36 }3738 await Task.WhenAll(batch);3940 // Update average41 _avgResponseTime = responseTimes.Average();42 responseTimes.Clear();43 }44 }45}
CPU-Based Scaling
csharp1class CpuAwareProcessor2{3 private static int GetOptimalParallelism()4 {5 // Base on logical processors6 int processors = Environment.ProcessorCount;78 // Adjust based on workload type9 // CPU-bound: use processor count10 // I/O-bound: can go higher11 return processors * 2; // For I/O-bound work12 }1314 public async Task ProcessAsync<T>(IEnumerable<T> items, Func<T, Task> processor)15 {16 var options = new ParallelOptions17 {18 MaxDegreeOfParallelism = GetOptimalParallelism()19 };2021 await Parallel.ForEachAsync(items, options, async (item, ct) =>22 {23 await processor(item);24 });25 }26}
Feedback-Based Adjustment
csharp1class FeedbackController2{3 private int _parallelism;4 private int _successCount;5 private int _errorCount;6 private readonly int _maxParallelism;78 public FeedbackController(int initial = 4, int max = 20)9 {10 _parallelism = initial;11 _maxParallelism = max;12 }1314 public int CurrentParallelism => _parallelism;1516 public void RecordSuccess()17 {18 Interlocked.Increment(ref _successCount);19 MaybeAdjust();20 }2122 public void RecordError()23 {24 Interlocked.Increment(ref _errorCount);25 // Immediately reduce on errors26 Interlocked.Exchange(ref _parallelism,27 Math.Max(1, _parallelism - 2));28 }2930 private void MaybeAdjust()31 {32 // Adjust every 100 operations33 if ((_successCount + _errorCount) % 100 != 0) return;3435 double errorRate = (double)_errorCount / (_successCount + _errorCount);3637 if (errorRate < 0.01 && _parallelism < _maxParallelism)38 {39 // Low error rate - scale up40 Interlocked.Increment(ref _parallelism);41 }42 else if (errorRate > 0.05)43 {44 // High error rate - scale down45 Interlocked.Exchange(ref _parallelism,46 Math.Max(1, _parallelism / 2));47 }4849 // Reset counters50 Interlocked.Exchange(ref _successCount, 0);51 Interlocked.Exchange(ref _errorCount, 0);52 }53}
Key Takeaways
- Dynamic parallelism adapts to workload
- Scale based on queue depth, response time, or error rate
- Start with minimum workers, scale up as needed
- Scale down when queue empties or load decreases
- Use feedback loops for self-tuning systems
- Consider CPU count as baseline for parallelism
- Monitor and log scaling decisions for debugging