Here are two approaches to this problem. The first one is flawed, but I am posting it anyway due to its extreme simplicity. A Buffer operator with a TimeSpan parameter already exists in the System.Reactive package, and converters between asynchronous and observable sequences exist in the System.Linq.Async package. So it's just a matter of chaining together three already available operators:
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
return source.ToObservable().Buffer(timeSpan, count).ToAsyncEnumerable();
}
Unfortunately this neat approach is flawed, because of the side-effects of shifting from the pull to the push and back to the pull model. What happens is that the intermediate observable sequence, when subscribed, starts pulling aggressively the source IAsyncEnumerable, regardless of how the resulting IAsyncEnumerable is pulled. So instead of the consumer of the resulting sequence being the driver of the enumeration, the enumeration happens silently in the background in the maximum speed allowed by the source sequence, and the produced messages are buffered in an internal queue. So not only it's possible for hidden latency to be imposed to the processing of the messages, but also it's possible for the memory consumption to skyrocket out of control.
The second is a hands-on approach, that uses the Task.Delay method as a timer, and the Task.WhenAny method for coordinating the timer and enumeration tasks. The behavior of this approach is similar to the Rx-based approach, except that the enumeration of the source sequence is driven by the consumer of the resulting sequence, as one would expect.
/// <summary>
/// Splits the elements of a sequence into chunks that are sent out when either
/// they're full, or a given amount of time has elapsed.
/// </summary>
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, TimeSpan timeSpan, int count)
{
if (source == null) throw new ArgumentNullException(nameof(source));
if (timeSpan < TimeSpan.Zero) throw new ArgumentNullException(nameof(timeSpan));
if (count < 1) throw new ArgumentOutOfRangeException(nameof(count));
return Implementation();
async IAsyncEnumerable<IList<TSource>> Implementation(
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
var timerCts = new CancellationTokenSource();
var delayTask = Task.Delay(timeSpan, timerCts.Token);
(ValueTask<bool> ValueTask, Task<bool> Task) moveNext = default;
using var linkedCts = CancellationTokenSource
.CreateLinkedTokenSource(cancellationToken);
var enumerator = source.GetAsyncEnumerator(linkedCts.Token);
try
{
moveNext = (enumerator.MoveNextAsync(), null);
var buffer = new List<TSource>(count);
ExceptionDispatchInfo error = null;
while (true)
{
Task completedTask = null;
if (!moveNext.ValueTask.IsCompleted)
{
// Preserve the ValueTask, if it's not preserved already.
if (moveNext.Task == null)
{
var preserved = moveNext.ValueTask.AsTask();
moveNext = (new ValueTask<bool>(preserved), preserved);
}
completedTask = await Task.WhenAny(moveNext.Task, delayTask)
.ConfigureAwait(false);
}
if (completedTask == delayTask)
{
Debug.Assert(delayTask.IsCompleted);
yield return buffer.ToArray(); // It's OK if the buffer is empty.
buffer.Clear();
delayTask = Task.Delay(timeSpan, timerCts.Token);
}
else
{
Debug.Assert(moveNext.ValueTask.IsCompleted);
// Await a copy, to prevent a second await on finally.
var moveNextCopy = moveNext.ValueTask;
moveNext = default;
bool moved;
try { moved = await moveNextCopy.ConfigureAwait(false); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
if (!moved) break;
buffer.Add(enumerator.Current);
if (buffer.Count == count)
{
timerCts.Cancel(); timerCts.Dispose();
timerCts = new CancellationTokenSource();
yield return buffer.ToArray();
buffer.Clear();
delayTask = Task.Delay(timeSpan, timerCts.Token);
}
try { moveNext = (enumerator.MoveNextAsync(), null); }
catch (Exception ex)
{
error = ExceptionDispatchInfo.Capture(ex); break;
}
}
}
if (buffer.Count > 0) yield return buffer.ToArray();
error?.Throw();
}
finally
{
// The finally runs when an enumerator created by this method is disposed.
timerCts.Cancel(); timerCts.Dispose();
// Prevent fire-and-forget, otherwise the DisposeAsync() might throw.
// Cancel the async-enumerator, for more responsive completion.
// Swallow MoveNextAsync errors, but propagate DisposeAsync errors.
linkedCts.Cancel();
try { await moveNext.ValueTask.ConfigureAwait(false); } catch { }
await enumerator.DisposeAsync().ConfigureAwait(false);
}
}
}
Care has been taken to avoid leaking fire-and-forget MoveNextAsync operations or timers.
Allocation of Task wrappers happens only when a MoveNextAsync call returns a non-completed ValueTask<bool>.
This implementation is non-destructive, meaning that no elements that have been consumed from the source sequence can be lost. In case the source sequence fails or the enumeration is canceled, any buffered elements will be emitted before the propagation of the error.