2

I am using Channel from System.Threading.Channels and wants to read items in batch (5 items) and I have a method like below,

public class Batcher
{
    private readonly Channel<MeasurementViewModel> _channel;
    public Batcher()
    {
        _channel = Channel.CreateUnbounded<MeasurementViewModel>();
    }
    public async Task<MeasurementViewModel[]> ReadBatchAsync(int batchSize, CancellationToken stoppingToken)
    {
        var result = new MeasurementViewModel[batchSize];

        for (var i = 0; i < batchSize; i++)
        {
            result[i] = await _channel.Reader.ReadAsync(stoppingToken);
        }

        return result;
    }
}

and in asp.net core background service I am using it like below,

public class WriterService : BackgroundService
{
    private readonly Batcher _batcher;
    public WriterService(Batcher batcher)
    {
        _batcher = batcher;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var batchOfItems = await _batcher.ReadBatchAsync(5, stoppingToken);

            var range = string.Join(',', batchOfItems.Select(item => item.Value));

            var x = range;
        }
    }
}

and this is working and whenever there is 5 items in Channel, I am getting range.

Question is, when there are only 2 items left in Channel and since last 10 minutes NO items coming to Channel, then how to read the remaining 2 items in Channel?

user584018
  • 8,077
  • 12
  • 50
  • 108
  • An alternative solution could be to use the [`ChannelReader.ReadAllAsync`](https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels.channelreader-1.readallasync) method, in combination with a [`Buffer`](https://stackoverflow.com/questions/67661709/how-to-batch-an-iasyncenumerablet-enforcing-a-maximum-interval-policy-between "How to batch an IAsyncEnumerable, enforcing a maximum interval policy between consecutive batches?") operator that has a `TimeSpan` argument. – Theodor Zoulias Nov 16 '21 at 09:48
  • Thanks @TheodorZoulias, any example code can you share? – user584018 Nov 18 '21 at 05:15
  • 1
    Inside the `ExecuteAsync` you could do this loop: `await foreach (var batchOfItems in _channel.Reader.ReadAllAsync().Buffer(5, TimeSpan.FromMinutes(10))) //...` – Theodor Zoulias Nov 18 '21 at 07:16
  • 1
    Thanks and appreciate! – user584018 Nov 18 '21 at 11:49

1 Answers1

4

You could create a linked CancellationTokenSource, so that you can watch simultaneously for both an external cancellation request, and an internally induced timeout. Below is an example of using this technique, by creating a ReadBatchAsync extension method for the ChannelReader class:

public static async ValueTask<T[]> ReadBatchAsync<T>(
    this ChannelReader<T> channelReader,
    int batchSize, TimeSpan timeout, CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    var items = new List<T>(batchSize);
    using (var linkedCTS
        = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken))
    {
        linkedCTS.CancelAfter(timeout);
        while (true)
        {
            var token = items.Count == 0 ? cancellationToken : linkedCTS.Token;
            T item;
            try
            {
                item = await channelReader.ReadAsync(token).ConfigureAwait(false);
            }
            catch (OperationCanceledException)
            {
                cancellationToken.ThrowIfCancellationRequested();
                break; // The cancellation was induced by timeout (ignore it)
            }
            catch (ChannelClosedException)
            {
                if (items.Count == 0) throw;
                break;
            }
            items.Add(item);
            if (items.Count >= batchSize) break;
        }
    }
    return items.ToArray();
}

This method will produce a batch immediately after the specified timeout has elapsed, or sooner if the batchSize has been reached, provided that the batch contains at least one item. Otherwise it will produce a single-item batch as soon as the first item is received.

In case the channel has been completed by calling the channel.Writer.Complete() method, and it contains no more items, the ReadBatchAsync method propagates the same ChannelClosedException that is thrown by the native ReadAsync method.

In case the external CancellationToken is canceled, the cancellation is propagated by throwing an OperationCanceledException. Any items that may have already been extracted internally from the ChannelReader<T> at this time, are lost. This makes the cancellation feature a destructive operation. It is advisable that the whole Channel<T> should be discarded after that.

Usage example:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    while (true)
    {
        MeasurementViewModel[] batch;
        try
        {
            batch = await _channel.Reader.ReadBatchAsync(
                5, TimeSpan.FromMinutes(10), stoppingToken);
        }
        catch (OperationCanceledException) { return; }
        catch (ChannelClosedException) { break; }

        Console.WriteLine(String.Join(',', batch.Select(item => item.Value)));
    }
    await _channel.Reader.Completion; // Propagate possible failure
}

Warning: Currently (.NET 6) the ChannelReader<T>.ReadAsync method is susceptible to leaking memory, under conditions that are relevant to this question and this answer. Specifically when the CancellationToken is canceled, the associated AsyncOperation remains attached in the channel's internal data structures, and it's not released until an item is written in the channel. So awaiting the ChannelReader<T>.ReadAsync repeatedly in a loop with a timer-based CancellationToken is not advisable. References:

For an alternative approach you can look here.

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69