11

I'm somewhat new to Rx.NET. Is it possible to catch an exception which may be thrown by any of the subscribers? Take the following...

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ => { });

Currently I'm catching on a per subscription basis with an instance of the following. The implementation of which just uses a ManualResetEvent to wake up a waiting thread.

public interface IExceptionCatcher
{
    Action<T> Exec<T>(Action<T> action);
}

and using it like so...

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ => { });

I feel like there must be some better way. Are all of the error handling capabilities in Rx.NET specifically for dealing with errors observables?

EDIT: Per request, my implementation is https://gist.github.com/1409829 (interface and implementation separated into different assemblies in prod code). Feedback is welcome. This may seem silly, but I'm using castle windsor to manage many different Rx subscribers. This exception catcher is registered with the container like this

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));

It would then be used like this where observable is instance of IObservable:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                {
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                }, false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);
drstevens
  • 2,893
  • 1
  • 19
  • 30

1 Answers1

11

The built-in Observable operators do not do what you are asking for by default (much like events), but you could make an extension method that would do this.

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception
{
    return Observable.Create<T>(
        o => source.Subscribe(
            v => { try { o.OnNext(v); }
                   catch (TException) { }
            },
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));
}

Then any observable could be wrapped by this method to get the behavior you described.

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
Gideon Engelberth
  • 6,035
  • 1
  • 20
  • 21
  • Thanks, you answered my question, but are you sure that your try/catch around OnNext will catch exceptions? One could very easily do something with the returned IObservable which would cause the subscribed code to be executed on another thread. I originally tried to put a try/catch around my Subject.OnNext call but exceptions were not caught. I could however create a SubscribeWithExceptionHandling method or something. – drstevens Aug 28 '11 at 13:04
  • 2
    @drstevens It will catch exceptions from the same thread. If your observer is launching async operations of its own that throw exceptions, this will not catch those. – Gideon Engelberth Aug 29 '11 at 22:20
  • 1
    Well considering how many of the Rx operations result in a new thread (or task in the pool), I would say it is pretty likely that would be the case. In between the `handler.FooStream` and `Subscribe` in question is a `GroupByUntil(...).SelectMany(...).Buffer(with a time)`. I actually ended up creating a `SubscribeWithCatch` following your example which catches Exception and then uses the same action passed to the OnError handler. – drstevens Aug 30 '11 at 07:17
  • @Jmix90 I have a day of meetings, but I will try and add a gist to my github later on today. – drstevens Nov 30 '11 at 15:03
  • 1
    @Jmix90 original question updated with my code. Feedback is welcome. – drstevens Nov 30 '11 at 17:27
  • 2
    I edited the answer because the newer Rx packages no longer have an `Observable.CreateWithDisposable` method. Instead they have an overloaded [`Observable.Create`](https://github.com/dotnet/reactive/blob/main/Rx.NET/Source/src/System.Reactive/Linq/Observable.Creation.cs) method. – Theodor Zoulias Nov 29 '20 at 13:58
  • 1
    Related: [How to handle exceptions in OnNext when using ObserveOn?](https://stackoverflow.com/questions/11182371/how-to-handle-exceptions-in-onnext-when-using-observeon) – Theodor Zoulias Nov 30 '20 at 05:42