-1

Below is sample console app and output is

enter image description here

Output is different each time and is fine but it needs to complete all tasks before I print result. It seems that Parallel.ForEachAsync is not waiting for all tasks to be completed. Am I missing anything here ?

internal class Program
{
    private async static Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        await TestParallel();
        sw.Stop();
        Console.WriteLine("Elapsed={0}", sw.Elapsed);
        Console.ReadLine();
    }

    private static async Task TestParallel()
    {
        var tests = new List<int>() { 1, 2, 3, 4, 5, 6 };
        var options = new ParallelOptions { MaxDegreeOfParallelism = 5,
            CancellationToken = CancellationToken.None };
        var responses = new List<string>();
        await Parallel.ForEachAsync(tests, options, async (testno, cancellationToken) =>
        {
            var response = await TestTask(testno);
            responses.Add(response);
        });
        foreach (var response in responses)
        {
            Console.WriteLine(response);
        }
    }
    private static Task<string> TestTask(int testno)
    {
        System.Threading.Thread.Sleep(1000);
        return Task.FromResult($"Test{testno}");
    }
}
Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
  • in terms of 'Output is different each time', that is expected behaviour. When you go parallel, you're not doing a series of tasks in a certain order, you are spinning them all up effectively at the same time and they'll each finish when they finish. Could be different every time. – Jonathan Jan 13 '22 at 19:42
  • 1
    In terms of 'it seems that Parallel.ForEachAsync is not waiting for all tasks to be completed', what makes you say this? – Jonathan Jan 13 '22 at 19:43
  • updated my question - Output is different each time and is fine but it needs to complete all tasks before I print result. It seems that Parallel.ForEachAsync is not waiting for all tasks to be completed. Am I missing anything here ? – user3838575 Jan 13 '22 at 19:45
  • 1
    is not waiting for all tasks to be completed because I am expecting it print all results (Order does not matter)Test1 Test2 Test3 Test4 Test5 Test6 – user3838575 Jan 13 '22 at 19:47
  • 1
    `List` is not thread safe. Maybe use `ConcurrentBag` and see if that helps. – John Wu Jan 13 '22 at 19:48
  • 1
    How come it waited 2 seconds then? 6 one second tasks with a MaxDOP of 5 would take 2 seconds for all to complete, no? – Caius Jard Jan 13 '22 at 19:50
  • 1
    Thank you @JohnWu, I changed var responses = new List(); to var responses = new ConcurrentBag(); and is working as per my expectation – user3838575 Jan 13 '22 at 19:51

2 Answers2

1

Answer for versions before .NET 6.


I think your example is a bit confusing. That's because you use an async callback. Mostly async is used for IO purposes.

Either go for: (this would be CPU-bound, doing some heavy calculations)

var responses = new List<string>();
var tests = new List<int>() { 1, 2, 3, 4 ,5,6};

Parallel.ForEach(tests, options, (testno) =>
{
    // no async here...
    var response = TestTask(testno);
    // lock the shared resource.
    lock(responses)
        responses.Add(response);
});

foreach (var response in responses)
{
    Console.WriteLine(response);
}

private static string TestTask(int testno)
{
    // calculations done here
    System.Threading.Thread.Sleep(1000);
    return $"Test{testno}";
}

Or go for: (this is IO-bound, for example getting content from external sources)

var tests = new List<int>() { 1, 2, 3, 4 ,5,6};

var tasks = new List<Task<string>>();

// just add the tasks to a list, so you can await them later.
// the first part (till the first await) will be completed synchronous. 
// If any async/await is used, the Task.WhenAll will wait for it. 
// Multiple tasks can be running simultaneously.
foreach(var t in tests)
    tasks.Add(TestTask(t));

await Task.WhenAll(tasks);

foreach (var task in tasks)
{
    // the current thread won't be blocked by calling the .Result here
    // All tasks are already completed.
    Console.WriteLine(task.Result);
}

private static async Task<string> TestTask(int testno)
{
    // Getting information from external resources.
    await Task.Delay(1000);
    return $"Test{testno}";
}

(there might be some typo's, haven't written in VS)

Jeroen van Langen
  • 19,966
  • 3
  • 36
  • 54
  • This code doesn't compile. The [`Parallel.ForEachAsync`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.parallel.foreachasync) takes an asynchronous delegate (`Func`). If you are going to do synchronous work, you should probably use the `Parallel.ForEach` instead. – Theodor Zoulias Jan 13 '22 at 23:45
  • _(there might be some typo's, haven't written in VS)_ I'll change it. – Jeroen van Langen Jan 14 '22 at 08:01
  • 1
    Hmm, after some reading the `Parallel.ForEachAsync` is also used for IO-Bound. That means that my answer mainly applies to the old framework.. I see that `Parallel.ForEachAsync` is .NET 6 _(working in .NET 5 atm)_ – Jeroen van Langen Jan 14 '22 at 08:08
0

Answer is below - changed line var responses = new ConcurrentBag();

internal class Program
{
    private async static Task Main(string[] args)
    {
        Stopwatch sw = new Stopwatch();
        sw.Start();
        await TestParallel();
        sw.Stop();
        Console.WriteLine("Elapsed={0}", sw.Elapsed);
        Console.ReadLine();
    }

    private static async Task TestParallel()
    {
        var tests = new List<int>() { 1, 2, 3, 4 ,5,6};
        var options = new ParallelOptions { MaxDegreeOfParallelism = 5, CancellationToken = CancellationToken.None };
        var responses = new ConcurrentBag<string>();
        await Parallel.ForEachAsync(tests, options, async (testno, cancellationToken) =>
        {
            var response = await TestTask(testno);
            responses.Add(response);
        });
        foreach (var response in responses)
        {
            Console.WriteLine(response);
        }
    }
    private static Task<string> TestTask(int testno)
    {
        System.Threading.Thread.Sleep(1000);
        return Task.FromResult($"Test{testno}");
    }
}
  • Nice answer. But instead of using the [highly specialized](https://stackoverflow.com/questions/15400133/when-to-use-blockingcollection-and-when-concurrentbag-instead-of-listt/64823123#64823123) `ConcurrentBag` class, I would strongly suggest to use the `ConcurrentQueue` class instead, which preserves the order of the added items. Even when order is not important, why choose a shuffling collection, when an order-preserving collection exists? If nothing else, it makes debugging more difficult. – Theodor Zoulias Jan 13 '22 at 23:50
  • 2
    @Theodor no, this is a terrible answer, because it doesn't even answer the question. I can't understand why it's upvoted; pretty sure it came along after John Wu's comment and doesn't even give as much info as that comment.. If it explained why using List in this context is a bad idea and is causing the results shown I could get on with it, but this looks like it's just taking someone else's inspiration (plagiarism) and posting it as a "try this" (no learning opportunity for the OP, "what do I do to make it work?" wasn't the question). Poor show all round – Caius Jard Jan 14 '22 at 01:41
  • 1
    @CaiusJard you are right. I was just trying to be nice. :-) This answer could be improved quite a lot. – Theodor Zoulias Jan 14 '22 at 02:06