15 minlesson

Parallel.ForEachAsync

Parallel.ForEachAsync

.NET 6 introduced Parallel.ForEachAsync, providing a clean way to process collections in parallel with async support and built-in throttling.

Basic Usage

csharp
1var items = new[] { "url1", "url2", "url3", "url4", "url5" };
2
3await Parallel.ForEachAsync(items, async (url, ct) =>
4{
5 var data = await httpClient.GetStringAsync(url, ct);
6 Console.WriteLine($"Downloaded {url}");
7});

Limiting Parallelism

Control the maximum concurrent operations:

csharp
1var options = new ParallelOptions
2{
3 MaxDegreeOfParallelism = 5 // Max 5 concurrent
4};
5
6await Parallel.ForEachAsync(urls, options, async (url, ct) =>
7{
8 await ProcessUrlAsync(url, ct);
9});

With Cancellation

csharp
1using var cts = new CancellationTokenSource();
2
3var options = new ParallelOptions
4{
5 MaxDegreeOfParallelism = 10,
6 CancellationToken = cts.Token
7};
8
9try
10{
11 await Parallel.ForEachAsync(items, options, async (item, ct) =>
12 {
13 await ProcessAsync(item, ct);
14 });
15}
16catch (OperationCanceledException)
17{
18 Console.WriteLine("Processing canceled");
19}
20
21// Cancel from another thread/task
22cts.Cancel();

Comparison with Manual Implementation

csharp
1// Manual semaphore-based throttling
2var semaphore = new SemaphoreSlim(5);
3var tasks = urls.Select(async url =>
4{
5 await semaphore.WaitAsync();
6 try
7 {
8 return await ProcessAsync(url);
9 }
10 finally
11 {
12 semaphore.Release();
13 }
14});
15await Task.WhenAll(tasks);
16
17// Parallel.ForEachAsync - cleaner!
18var options = new ParallelOptions { MaxDegreeOfParallelism = 5 };
19await Parallel.ForEachAsync(urls, options, async (url, ct) =>
20{
21 await ProcessAsync(url);
22});

Processing with Index

csharp
1int index = 0;
2
3await Parallel.ForEachAsync(items, async (item, ct) =>
4{
5 int localIndex = Interlocked.Increment(ref index);
6 Console.WriteLine($"Processing item {localIndex}: {item}");
7 await ProcessAsync(item, ct);
8});

Collecting Results

Parallel.ForEachAsync doesn't return results directly. Use thread-safe collections:

csharp
1var results = new ConcurrentBag<Result>();
2
3await Parallel.ForEachAsync(items, async (item, ct) =>
4{
5 var result = await ProcessAsync(item, ct);
6 results.Add(result);
7});
8
9var allResults = results.ToArray();

Error Handling

Exceptions are aggregated:

csharp
1try
2{
3 await Parallel.ForEachAsync(items, async (item, ct) =>
4 {
5 if (item == "bad")
6 throw new InvalidOperationException("Bad item!");
7
8 await ProcessAsync(item, ct);
9 });
10}
11catch (AggregateException ae)
12{
13 foreach (var ex in ae.InnerExceptions)
14 {
15 Console.WriteLine($"Error: {ex.Message}");
16 }
17}

Parallel.ForEachAsync vs Task.WhenAll

csharp
1// Task.WhenAll - all start immediately
2var tasks = items.Select(item => ProcessAsync(item));
3await Task.WhenAll(tasks); // 1000 concurrent if 1000 items!
4
5// Parallel.ForEachAsync - controlled parallelism
6var options = new ParallelOptions { MaxDegreeOfParallelism = 10 };
7await Parallel.ForEachAsync(items, options, async (item, ct) =>
8{
9 await ProcessAsync(item, ct);
10}); // Max 10 concurrent

Best Practices

Do Use When:

  • Processing many independent async operations
  • You need to limit concurrency
  • Items can be processed in any order

Avoid When:

  • Items have dependencies on each other
  • Order of completion matters
  • You need individual results (use PLINQ or manual)

Complete Example: Batch Download

csharp
1async Task DownloadAllAsync(string[] urls, string outputDir)
2{
3 var options = new ParallelOptions
4 {
5 MaxDegreeOfParallelism = Environment.ProcessorCount * 2
6 };
7
8 var completed = 0;
9 var total = urls.Length;
10
11 await Parallel.ForEachAsync(urls, options, async (url, ct) =>
12 {
13 try
14 {
15 var fileName = Path.GetFileName(new Uri(url).LocalPath);
16 var outputPath = Path.Combine(outputDir, fileName);
17
18 using var response = await httpClient.GetAsync(url, ct);
19 using var stream = await response.Content.ReadAsStreamAsync(ct);
20 using var file = File.Create(outputPath);
21 await stream.CopyToAsync(file, ct);
22
23 var count = Interlocked.Increment(ref completed);
24 Console.WriteLine($"[{count}/{total}] Downloaded: {fileName}");
25 }
26 catch (Exception ex)
27 {
28 Console.WriteLine($"Failed to download {url}: {ex.Message}");
29 }
30 });
31
32 Console.WriteLine($"Completed: {completed}/{total}");
33}

Key Takeaways

  • Parallel.ForEachAsync simplifies parallel async processing
  • Use MaxDegreeOfParallelism to control concurrency
  • Built-in cancellation support via CancellationToken
  • Use ConcurrentBag or similar for collecting results
  • Exceptions are aggregated into AggregateException
  • Cleaner than manual semaphore-based throttling
  • Available in .NET 6+