-1

I've created a Producer/Consumer program which uses N threads of invocations ( currently only 1).

void Main()
{
    var pcQ = new PCQueue(1); // Maximum concurrency of 1

    foreach (int item in Enumerable.Range(1, 5))
    {
        pcQ.Enqueue(async () =>
      {
          await Task.Delay(1000); //<-------simulation of work
          Console.WriteLine(item);
      });
    }

}

public class PCQueue
{
    private BlockingCollection<Task> _taskQ = new BlockingCollection<Task>();
    public Task Enqueue(Action action, CancellationToken cancelToken = default(CancellationToken))
    {
        var task = new Task(action, cancelToken);
        _taskQ.Add(task);
        return task;
    }
    public PCQueue(int workerCount)
    {

        for (int i = 0; i < workerCount; i++)
            Task.Run(Consume);
    }

    async Task Consume()
    {
        foreach (var task in _taskQ.GetConsumingEnumerable())
            try
            {
                    task.RunSynchronously();
            }
            catch (InvalidOperationException)
            {
            }
    }

}

The output is :

enter image description here

The problem is that after a second, I see all results at once.

A new item is being fetched from the blocking collection before the previous item has completed.

This is happening because of this line (without awaiting):

task.RunSynchronously();

Question: How can I await this task to complete before another one is fetched. I'm expecting to see a new line after every second.

Royi Namir
  • 138,711
  • 129
  • 435
  • 755
  • 2
    I don't think your problem is what you think it is. your `Enqueue` method takes an `Action` (an `Action` has no return type) meaning that any `async` action you pass to it will have an `async void` signature instead of having an awaitable `Task` handle. As far as I know, no matter what you'll do after that it will always have the fire and forget mechanism associated with `async void`. So to work around it instead of asking for an `Action` you might be able to ask for a `Func`, store those and then `await` them when they're up. –  Apr 11 '21 at 12:00
  • I don't understand. Why do you need multiple consumers if you do operations in sequence? – tymtam Apr 11 '21 at 12:01
  • @tymtam currently there is jsut one thread that "fetch" items. but it can be changed to n consumers. – Royi Namir Apr 11 '21 at 12:02
  • Your image doesn't even compile, but you're correct you're making the same kind of mistake as with asking for an `Action`. Change `await task.RunSynchronously();` to `await task;` (`task.RunSynchronously` returns void and therefor doesn't provide an awaitable `Task` handle) –  Apr 11 '21 at 12:07
  • You can await `task.Start(); await task;`, but that still won't ensure that tasks are started one after another. – tymtam Apr 11 '21 at 12:11
  • 1
    As a side note the `PCQueue` class looks to me like an attempt to reinvent the [`ActionBlock`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.actionblock-1) from the [TPL Dataflow](https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/dataflow-task-parallel-library) library. – Theodor Zoulias Apr 11 '21 at 13:19
  • @TheodorZoulias it's from Joe http://www.albahari.com/threading/part5.aspx#_BlockingCollectionT – Royi Namir Apr 11 '21 at 13:20
  • 1
    I see. Joseph Albahari is one of my favorite authors, but his book is now slightly outdated. The `BlockingCollection` class is a powerful tool, but not as powerful as the TPL Dataflow blocks. You can see [here](https://stackoverflow.com/questions/21424084/task-sequencing-and-re-entracy/62882637#62882637) an example of using an `ActionBlock` as the underlying engine of a class similar to the `PCQueue`, that schedules asynchronous work. Features like cancellation and max degree of parallelism can be delegated transparently to the underlying `ActionBlock`. – Theodor Zoulias Apr 11 '21 at 13:34
  • Small hint https://devblogs.microsoft.com/pfxteam/await-synchronizationcontext-and-console-apps/ – aepot Apr 11 '21 at 14:27

1 Answers1

1

You can await the task with task.Start(); await task; but this will still not solve the problem because you'd await the outer task that wraps the actual job.

Once you change the collection to keep Task and pass Func<Task> the inner job cab be awaited:

private BlockingCollection<Task<Task>> _taskQ = new BlockingCollection<Task<Task>>();
            public Task Enqueue(Func<Task> action, CancellationToken cancelToken = default(CancellationToken))
            {
                var task = new Task<Task>(action, cancelToken);
                _taskQ.Add(task);
                return task;
            }(...)

task.Start();
var job = await task;
await job;

This would then for one cosumer execute sequentially.

Going Func<Task

We can simplify the code if we go with Func<Task>:

static void Main()
{
    var pcQ = new PCQueue(1); // Maximum concurrency of 1

    foreach (int item in Enumerable.Range(1, 5))
    {
        pcQ.Enqueue(async (CancellationToken token) =>
        {
            Console.WriteLine($"Starting {item}");
            await Task.Delay(100, token); //<-------simulation of work
            Console.WriteLine($"Ending {item}");
        });
    }
    Console.ReadLine();

}

public class PCQueue
{
    private BlockingCollection<Func<CancellationToken, Task>> _taskQ = new BlockingCollection<Func<CancellationToken, Task>>();
    public void Enqueue(Func<CancellationToken, Task> action) =>  _taskQ.Add(action);

    public PCQueue(int workerCount)
    {
        for (int i = 0; i < workerCount; i++)
            Task.Run(Consume);
    }

    async Task Consume()
    {
        var cancellationToken = ...
        foreach (var f in _taskQ.GetConsumingEnumerable())
                await f(cancellationToken);
    }

        }
    }
}
tymtam
  • 24,269
  • 5
  • 71
  • 101