.NET 6 update: The implementations below are no longer relevant after the introduction of the Parallel.ForEachAsync API. They can be useful only for projects that are targeting versions of the .NET platform older than the .NET 6.
Here is a simple generic implementation of a ForEachAsync method, based on an ActionBlock from the TPL Dataflow library, now embedded in the .NET 5 platform:
public static Task ForEachAsync<T>(this IEnumerable<T> source,
Func<T, Task> action, int dop)
{
// Arguments validation omitted
var block = new ActionBlock<T>(action,
new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = dop });
try
{
foreach (var item in source) block.Post(item);
block.Complete();
}
catch (Exception ex) { ((IDataflowBlock)block).Fault(ex); }
return block.Completion;
}
This solution enumerates eagerly the supplied IEnumerable, and sends immediately all its elements to the ActionBlock. So it is not very suitable for enumerables with huge number of elements. Below is a more sophisticated approach, that enumerates the source lazily, and sends its elements to the ActionBlock one by one:
public static async Task ForEachAsync<T>(this IEnumerable<T> source,
Func<T, Task> action, int dop)
{
// Arguments validation omitted
var block = new ActionBlock<T>(action, new ExecutionDataflowBlockOptions()
{ MaxDegreeOfParallelism = dop, BoundedCapacity = dop });
try
{
foreach (var item in source)
if (!await block.SendAsync(item).ConfigureAwait(false)) break;
block.Complete();
}
catch (Exception ex) { ((IDataflowBlock)block).Fault(ex); }
try { await block.Completion.ConfigureAwait(false); }
catch { block.Completion.Wait(); } // Propagate AggregateException
}
These two methods have different behavior in case of exceptions. The first¹ propagates an AggregateException containing the exceptions directly in its InnerExceptions property. The second propagates an AggregateException that contains another AggregateException with the exceptions. Personally I find the behavior of the second method more convenient in practice, because awaiting it eliminates automatically a level of nesting, and so I can simply catch (AggregateException aex) and handle the aex.InnerExceptions inside the catch block. The first method requires to store the Task before awaiting it, so that I can gain access the task.Exception.InnerExceptions inside the catch block. For more info about propagating exceptions from async methods, look here or here.
Both implementations handle gracefully any errors that may occur during the enumeration of the source. The ForEachAsync method does not complete before all pending operations are completed. No tasks are left behind unobserved (in fire-and-forget fashion).
¹ The first implementation elides async and await.