15 minlesson

Dynamic Parallelism

Dynamic Parallelism

Dynamic parallelism adjusts the number of workers based on workload, system resources, or runtime conditions. This enables optimal resource utilization.

Why Dynamic Parallelism?

Static worker count has problems:

  • Too few workers → underutilized resources
  • Too many workers → resource contention, overhead
  • Workload varies → fixed count is suboptimal

Dynamic parallelism adapts:

1Workload: ████████░░░░████████████░░░░░░░░░████
2Workers: 4 2 8 2 6

Basic Dynamic Worker Pool

csharp
1class DynamicWorkerPool<T>
2{
3 private readonly Channel<T> _workQueue;
4 private readonly Func<T, Task> _processor;
5 private readonly List<Task> _workers = new();
6 private readonly int _minWorkers;
7 private readonly int _maxWorkers;
8
9 public DynamicWorkerPool(
10 Func<T, Task> processor,
11 int minWorkers = 1,
12 int maxWorkers = 10)
13 {
14 _processor = processor;
15 _minWorkers = minWorkers;
16 _maxWorkers = maxWorkers;
17 _workQueue = Channel.CreateBounded<T>(100);
18
19 // Start minimum workers
20 for (int i = 0; i < minWorkers; i++)
21 AddWorker();
22 }
23
24 public int WorkerCount => _workers.Count(t => !t.IsCompleted);
25 public int QueueDepth => _workQueue.Reader.Count;
26
27 public async ValueTask EnqueueAsync(T item)
28 {
29 // Scale up if queue is backing up
30 if (QueueDepth > 10 && WorkerCount < _maxWorkers)
31 {
32 AddWorker();
33 }
34
35 await _workQueue.Writer.WriteAsync(item);
36 }
37
38 private void AddWorker()
39 {
40 var worker = Task.Run(async () =>
41 {
42 await foreach (var item in _workQueue.Reader.ReadAllAsync())
43 {
44 await _processor(item);
45 }
46 });
47 _workers.Add(worker);
48 }
49}

Scaling Based on Queue Depth

csharp
1class AdaptiveProcessor<T>
2{
3 private readonly Channel<T> _channel;
4 private int _workerCount = 0;
5 private readonly int _maxWorkers;
6 private readonly Func<T, Task> _handler;
7
8 public AdaptiveProcessor(Func<T, Task> handler, int maxWorkers = 10)
9 {
10 _handler = handler;
11 _maxWorkers = maxWorkers;
12 _channel = Channel.CreateBounded<T>(1000);
13
14 // Monitor and scale
15 _ = MonitorAndScaleAsync();
16 }
17
18 private async Task MonitorAndScaleAsync()
19 {
20 while (true)
21 {
22 await Task.Delay(100); // Check every 100ms
23
24 int queueDepth = _channel.Reader.Count;
25
26 // Scale up
27 if (queueDepth > 50 && _workerCount < _maxWorkers)
28 {
29 StartWorker();
30 }
31 // Scale down (let workers exit naturally when queue empties)
32 }
33 }
34
35 private void StartWorker()
36 {
37 Interlocked.Increment(ref _workerCount);
38
39 _ = Task.Run(async () =>
40 {
41 try
42 {
43 await foreach (var item in _channel.Reader.ReadAllAsync())
44 {
45 await _handler(item);
46
47 // Exit if queue is nearly empty and we have extra workers
48 if (_channel.Reader.Count < 5 && _workerCount > 1)
49 {
50 break;
51 }
52 }
53 }
54 finally
55 {
56 Interlocked.Decrement(ref _workerCount);
57 }
58 });
59 }
60}

Scaling Based on Response Time

csharp
1class ResponseTimeScaler
2{
3 private readonly double _targetResponseTimeMs = 100;
4 private double _avgResponseTime = 0;
5 private int _currentParallelism = 1;
6
7 public async Task ProcessAsync<T>(
8 IEnumerable<T> items,
9 Func<T, Task> processor)
10 {
11 var queue = new ConcurrentQueue<T>(items);
12 var responseTimes = new ConcurrentBag<double>();
13
14 while (!queue.IsEmpty || responseTimes.Any())
15 {
16 // Adjust parallelism based on response time
17 if (_avgResponseTime > _targetResponseTimeMs * 1.5)
18 {
19 _currentParallelism = Math.Max(1, _currentParallelism - 1);
20 }
21 else if (_avgResponseTime < _targetResponseTimeMs * 0.5)
22 {
23 _currentParallelism = Math.Min(20, _currentParallelism + 1);
24 }
25
26 // Process batch
27 var batch = new List<Task>();
28 for (int i = 0; i < _currentParallelism && queue.TryDequeue(out var item); i++)
29 {
30 batch.Add(Task.Run(async () =>
31 {
32 var sw = Stopwatch.StartNew();
33 await processor(item);
34 responseTimes.Add(sw.ElapsedMilliseconds);
35 }));
36 }
37
38 await Task.WhenAll(batch);
39
40 // Update average
41 _avgResponseTime = responseTimes.Average();
42 responseTimes.Clear();
43 }
44 }
45}

CPU-Based Scaling

csharp
1class CpuAwareProcessor
2{
3 private static int GetOptimalParallelism()
4 {
5 // Base on logical processors
6 int processors = Environment.ProcessorCount;
7
8 // Adjust based on workload type
9 // CPU-bound: use processor count
10 // I/O-bound: can go higher
11 return processors * 2; // For I/O-bound work
12 }
13
14 public async Task ProcessAsync<T>(IEnumerable<T> items, Func<T, Task> processor)
15 {
16 var options = new ParallelOptions
17 {
18 MaxDegreeOfParallelism = GetOptimalParallelism()
19 };
20
21 await Parallel.ForEachAsync(items, options, async (item, ct) =>
22 {
23 await processor(item);
24 });
25 }
26}

Feedback-Based Adjustment

csharp
1class FeedbackController
2{
3 private int _parallelism;
4 private int _successCount;
5 private int _errorCount;
6 private readonly int _maxParallelism;
7
8 public FeedbackController(int initial = 4, int max = 20)
9 {
10 _parallelism = initial;
11 _maxParallelism = max;
12 }
13
14 public int CurrentParallelism => _parallelism;
15
16 public void RecordSuccess()
17 {
18 Interlocked.Increment(ref _successCount);
19 MaybeAdjust();
20 }
21
22 public void RecordError()
23 {
24 Interlocked.Increment(ref _errorCount);
25 // Immediately reduce on errors
26 Interlocked.Exchange(ref _parallelism,
27 Math.Max(1, _parallelism - 2));
28 }
29
30 private void MaybeAdjust()
31 {
32 // Adjust every 100 operations
33 if ((_successCount + _errorCount) % 100 != 0) return;
34
35 double errorRate = (double)_errorCount / (_successCount + _errorCount);
36
37 if (errorRate < 0.01 && _parallelism < _maxParallelism)
38 {
39 // Low error rate - scale up
40 Interlocked.Increment(ref _parallelism);
41 }
42 else if (errorRate > 0.05)
43 {
44 // High error rate - scale down
45 Interlocked.Exchange(ref _parallelism,
46 Math.Max(1, _parallelism / 2));
47 }
48
49 // Reset counters
50 Interlocked.Exchange(ref _successCount, 0);
51 Interlocked.Exchange(ref _errorCount, 0);
52 }
53}

Key Takeaways

  • Dynamic parallelism adapts to workload
  • Scale based on queue depth, response time, or error rate
  • Start with minimum workers, scale up as needed
  • Scale down when queue empties or load decreases
  • Use feedback loops for self-tuning systems
  • Consider CPU count as baseline for parallelism
  • Monitor and log scaling decisions for debugging
Dynamic Parallelism - Anko Academy