I am rather new to RxJava and - as so many others - am trying to get my head around exception handling. I read quite a few post online (e.g. this discussion here how to handle exceptions thrown by observer's onNext) and think that I get the basic idea of the concepts.
In the above mentioned discussion, one of the posters says, that when an exception is thrown in a subscriber, RxJava does the following:
Implement generic handling to log the failure and stop sending it events (of any kind) and clean up any resources due to that subscriber and carry on with any remaining subscriptions.
This is also more or less what I see, the only thing I have problems with is the "clean up any ressources" bit. To make that clear, let's assume the following example:
I want to create an Observable that listens to an async event source (e.g. a JMS queue) and onNext()s on every received message. So in (pseudo-) code I would do something similar to this:
Observable<String> observable = Observable.create( s -> {
createConnectionToBroker();
getConsumer().setMessageListener(message -> s.onNext(transform(message)));
s.setDisposable(new Disposable() {
public void dispose() {
tearDownBrokerConnection();
}
});
});
Since I want to reuse the message listener for many subscribers / observers, I do not directly subscribe at the created Observable, but make use of the publish().refCount() team instead. Something similar to this:
Observable<String> observableToSubscribeTo = observable.publish().refCount();
Disposable d1 = observableToSubscribeTo.subscribe(s -> ...);
Disposable d2 = observableToSubscribeTo.subscribe(s -> ...);
This all works as expected. The code connects to JMS only when the first subscription is established, and the connection to the broker is closed when the last observer was dispose()d.
However, when a subscriber throws an exception when it is onNext()ed, things seem to get messy. As expected, the observer that threw is nuked, and whenever a new event is published, it won't be notified anymore. My problem appears that when all the remaining subscribers are dispose()d, the Observable that maintains the connection to the message broker is no longer notified. It looks to me as if the subscriber that threw the exception is in some sort of zombie state. It is ignored when it comes to event distribution, but it somehow prevents the root Observable to get notified when the last subscriber is dispose()d.
I understand that RxJava expects the observers to make sure to not throw but rather handle an eventual exception properly. Unfortunately, in the case where I want to provide a library that returns an Observable to the caller, I have no control over my subscribers whatsoever. This means, I would never be able to protect my library against stupid observers.
So, I am asking myself: am I missing something here? Is there really no chance to properly cleanup when a subscriber throws? Is this a bug or is it just me not understanding the library?
Any insights greatly appreciated!