6

I am looking for something similar to the exhaustMap operator from rxjs, but RX.NET does not seem to have such an operator.

What I need to achieve is that, upon every element of the source stream, I need to start an async handler, and until it finishes, I would like to drop any elements from the source. As soon as the handler finishes, resume taking elements.

What I don't want is to start an async handler upon every element - while the handler runs, I want to drop source elements.

I also suspect I need to cleverly use the defer operator here?

Thank you!

Liam
  • 25,247
  • 27
  • 110
  • 174
wh1t3cat1k
  • 3,126
  • 6
  • 31
  • 36

1 Answers1

12

Here is an implementation of the ExhaustMap operator. The source observable is projected to an IObservable<Task<TResult>>, where each subsequent task is either the previous one if it's still running, or otherwise a new task associated with the current item. Repeated occurrences of the same task are then removed with the DistinctUntilChanged operator, and finally the observable is flattened with the Concat operator.

/// <summary>Invokes an asynchronous function for each element of an observable
/// sequence, ignoring elements that are emitted before the completion of an
/// asynchronous function of a preceding element.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, Task<TResult>> function)
{
    return source
        .Scan(Task.FromResult<TResult>(default), (previousTask, item) =>
        {
            return !previousTask.IsCompleted ? previousTask : HideIdentity(function(item));
        })
        .DistinctUntilChanged()
        .Concat();

    async Task<TResult> HideIdentity(Task<TResult> task) => await task;
}

The tasks returned by the function are not guaranteed to be distinct, hence the need for the HideIdentity local function that returns distinct wrappers of the tasks.

Usage example:

Observable
    .Interval(TimeSpan.FromMilliseconds(200))
    .Select(x => (int)x + 1)
    .Take(10)
    .Do(x => Console.WriteLine($"Input: {x}"))
    .ExhaustMap(async x => { await Task.Delay(x % 3 == 0 ? 500 : 100); return x; })
    .Do(x => Console.WriteLine($"Result: {x}"))
    .Wait();

Output:

Input: 1
Result: 1
Input: 2
Result: 2
Input: 3
Input: 4
Input: 5
Result: 3
Input: 6
Input: 7
Input: 8
Result: 6
Input: 9
Input: 10
Result: 9

Update: Here is an alternative implementation, where the function produces an IObservable<TResult> instead of a Task<TResult>:

/// <summary>Projects each element to an observable sequence, which is merged
/// in the output observable sequence only if the previous projected observable
/// sequence has completed.</summary>
public static IObservable<TResult> ExhaustMap<TSource, TResult>(
    this IObservable<TSource> source,
    Func<TSource, IObservable<TResult>> function)
{
    return Observable.Defer(() =>
    {
        int mutex = 0; // 0: not acquired, 1: acquired
        return source.SelectMany(item =>
        {
            // Attempt to acquire the mutex immediately. If successful, return
            // a sequence that releases the mutex when terminated. Otherwise,
            // return immediately an empty sequence.
            if (Interlocked.CompareExchange(ref mutex, 1, 0) == 0)
                return function(item).Finally(() => Volatile.Write(ref mutex, 0));
            return Observable.Empty<TResult>();
        });
    });
}
Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
  • 2
    Nice one, Theodor! I've been away from the rx tag for a while - great to see you're holding up the fort. – Asti Oct 17 '20 at 14:02
  • Hi @Asti! Yeap, that was a nice question, suitable for an elegant answer, an ideal question for StackOverflow. There was some friction early on, with the question being closed for no good reason, as you can see in the [revisions](https://stackoverflow.com/posts/64353907/revisions). But now everything is OK. – Theodor Zoulias Oct 17 '20 at 16:33