1

Dislaimer: I am newbie to Rx.Net.

I want to understand the best way to consume events from the server using Rx.Net. Currently, I have a consumer class that contains a rx Subject, to delegate the consumed update to downstream consumers as :

Event Listener/Processor:

public IObservable<IUpdate> UpdateStream => _subject?.AsObservable();

try
{
    // ... processing ... 
    _subject.OnNext(update); // update is the variable
}
catch (Exception ex)
{
    _subject.OnError(ex);
}

Downstream-subscribers:

public void Subscribe()
{
  _eventListener.UpdateStream.Subscribe(update => 
  {
       _fooProcessor.Process(update);
  },
  ex => 
  {
     // log
     Subscribe(); // an effort to resubscribe lost subscription
  },
  () => { // log completion (optional)...}
}

I have noticed that subject throws exception onNext (an item with the same key has already been added), wherein, the subject.HasObservers property is false (in other words, the downstream subscription list is lost). The OnError code line does hit, but the downstream subscribers do not get notified (because of lost subscription).

I tried using Observer.EventPattern to listen to the consuming event and create the observable to be consumed by downstream-subscribers; but that did not work as well (I could not evaluate the point of failure in this case).

Is there a pattern to resubscribe from downstream consumers (on different dlls), in such cases?

Appreciate any help. Thanks!

Paulo Morgado
  • 12,698
  • 3
  • 27
  • 55
Saket
  • 80
  • 6

1 Answers1

0

I found that the downstream-subscriber was throwing an exception, resulting in dropping the subscription. This is not an issue now.

Thanks - How to handle exceptions in OnNext when using ObserveOn?

Saket
  • 80
  • 6