Original question
I have a scenario where I have multiple IObservable sequences which I want to combine with Merge and then listen to. However, if one of these produces an error I don't want it to crash everything for the other streams, as well as to resubscribe to the sequence (this is an 'ever lasting' sequence).
I do this by appending a Retry() to the streams before merge, i.e.:
IEnumerable<IObservable<int>> observables = GetObservables();
observables
.Select(o => o.Retry())
.Merge()
.Subscribe(/* Do subscription stuff */);
However, the problem arises when I want to test this. What I would like to test is that if one of the IObservables in observables produces an OnError, the other ones should still be able to send their values through and they should get handled
I thought I'd just use two Subject<int>s representing two IObservables in observables; one sending an OnError(new Exception()) and the other, after that, sending OnNext(1). However, it seems Subject<int> will replay all previous values for a new subscription (which effectively Retry() is), turning the test into an infinite loop.
I tried to solve it by creating a manual IObservable that produces an error on the first subscription and later an empty sequence, but it feels hacky:
var i = 0;
var nErrors = 2;
var testErrorObservableWithOneErrorAndThenCompletion = Observable.Create<int>(o => {
i++;
if (i < nErrors) {
return Observable.Throw<int>(new Exception()).Subscribe(o);
} else {
return Observable.Empty<int>().Subscribe(o);
}
});
Am I using Subject or thinking about Retry() in the wrong way? Any other thoughts on this? How would you solve this situation?
Update
Ok, here's a marble diagram of what I want and think Retry() does.
o = message, X = error.
------o---o---X
\
Retry() -> \---o---o---X
\
Retry() -> \...
My problem is perhaps more in that I don't have a good stock class to use fore testing, since Subject wants to replay all of my previous errors.
Update 2
Here's a test case that shows what I mean about Subject replaying its values. Am I using the term correctly if I say it does this in a cold way? I know Subject is a way of creating a hot observable, but still this behavior feels 'cold' to me.
var onNext = false;
var subject = new Subject<int>();
subject.Retry().Subscribe(x => onNext = true);
subject.OnError(new Exception());
subject.OnNext(1);
Assert.That(onNext, Is.True);