28

I would like to call back an async function within an Rx subscription.

E.g. like that:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

What needs to be done, in order to catch the exception properly?

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
Martin Komischke
  • 1,290
  • 1
  • 12
  • 24
  • 1
    I just figured that I can put async at the first place. But unfortunately this doesn't solve the problem I am faced with. I will post a more explicit example. – Martin Komischke Apr 11 '14 at 08:38
  • possible duplicate of [Is there a way to subscribe an observer as async](http://stackoverflow.com/questions/18814805/is-there-a-way-to-subscribe-an-observer-as-async) – Zache Apr 11 '14 at 08:48

4 Answers4

39

Ana Betts' answer works in most scenarios, but if you want to block the stream while waiting for the async function to finish you need something like this:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(l => Observable.FromAsync(asyncMethod))
          .Concat()
          .Subscribe();

Or:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
          .Concat()
          .Subscribe();
TeaDrivenDev
  • 6,576
  • 32
  • 50
reijerh
  • 732
  • 1
  • 7
  • 15
21

Change this to:

Observable.Timer(TimeSpan.FromMilliseconds(100))
    .SelectMany(async _ => await RunAsync())
    .Subscribe();

Subscribe doesn't keep the async operation inside the Observable.

Ana Betts
  • 72,589
  • 16
  • 137
  • 204
  • 2
    Thanks Paul, for your suggestion. Very interesting. Do you have something where I can read a little bit further about that? – Martin Komischke May 05 '14 at 07:25
  • 1
    Note for others using this solution, if you need keep the order just replace `SelectMany` with `Select` and then `.Concat` (similar to reijerh's answer). – bokibeg Oct 28 '19 at 19:53
18

You don't want to pass an async method to Subscribe, because that will create an async void method. Do your best to avoid async void.

In your case, I think what you want is to call the async method for each element of the sequence and then cache all the results. In that case, use SelectMany to call the async method for each element, and Replay to cache (plus a Connect to get the ball rolling):

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}

I changed the Results property to be returned from the Trigger method instead, which I think makes more sense, so the test now looks like:

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}
Stephen Cleary
  • 406,130
  • 70
  • 637
  • 767
0

Building on reijerh's answer, I created an extension method.

public static IDisposable SubscribeAsync<TResult>(this IObservable<TResult> source, Func<Task> action) =>
            source.Select(_ => Observable.FromAsync(action))
                .Concat()
                .Subscribe();

If I understand this correctly, it should block until the async task finishes. But it will allow you to call SubscribeAsync and pass in your task. In my opinion this makes it a little more readable.

WhenSomethingHappened
    .SubscribeAsync(async () => { 
        await DoFirstAsyncThing(); 
        await DoSecondAsyncThing(); 
    });

Hackmodford
  • 3,855
  • 4
  • 31
  • 76