5

I'm trying to wrap an asynchronous subscription API based on events with an API based on IAsyncEnumerable. Basically along the lines of:

async IAsyncEnumerable<string> ReadAll() 
{
    var reader = new EventBasedReader();
    reader.OnRead => (_, args) => yield return e.Message;
    reader.Start();
    await reader.WaitUntilAllRead();
}

However this doesn't work because it's the event handler that yields, and this isn't allowed. Is there another way I can write this to make it work as an IAsyncEnumerable?

Barguast
  • 5,482
  • 7
  • 39
  • 69
  • 1
    You may find this interesting: [Factory for IAsyncEnumerable or IAsyncEnumerator](https://stackoverflow.com/questions/61540896/factory-for-iasyncenumerable-or-iasyncenumerator). Also you could consider adding the [iasyncenumerable](https://stackoverflow.com/questions/tagged/iasyncenumerable) tag to the question. – Theodor Zoulias Jun 05 '20 at 12:09
  • I faced a similar problem and have just [blogged about it](https://dev.to/noseratio/c-events-as-asynchronous-streams-with-reactivex-or-channels-82k). – noseratio Jul 17 '20 at 12:12

1 Answers1

11

wrap an asynchronous subscription API based on events with an API based on IAsyncEnumerable.

Those two are not directly compatible. Events are push-based, and enumerables (including async enumerables) are pull-based.

In order to cross that divide, you need a buffer - some place to hold the event data as it is pushed to you but before the downstream code has pulled it.

I recommend using Channels for buffers. If your use case allows it, you could use an unbounded channel:

IAsyncEnumerable<string> ReadAll() 
{
  var reader = new EventBasedReader();
  var buffer = Channel.CreateUnbounded<TMessage>();
  reader.OnRead = async (_, args) => await buffer.Writer.WriteAsync(e.Message);
  reader.Start();
  CompleteBufferWhenEventsAreDone();
  return buffer.Reader.ReadAllAsync();

  async void CompleteBufferWhenEventsAreDone()
  {
    await reader.WaitUntilAllRead();
    buffer.Writer.TryComplete();
  }
}
Stephen Cleary
  • 406,130
  • 70
  • 637
  • 767