12

I have implemented a producer..consumer pattern using TPL Dataflow. The use case is that code reads messages from the Kafka bus. For efficiency, we need to process messages in batches when going to the database.

Is there a way in TPL data flow to hold on to the message and fire whenever a size or duration threshold is hit?

Example, the current implementation post the message once it is pulled from the queue.

    postedSuccessfully = targetBuffer.Post(msg.Value);
Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
Ashish Bhatia
  • 151
  • 1
  • 5
  • Use [`BatchBlock`](https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-using-batchblock-and-batchedjoinblock-to-improve-efficiency). The `BatchBlock` will collect messages until a batch size is readched then emit a collection of messages for downstream processing. – JSteward Oct 03 '18 at 18:33
  • 1
    Thanks. BatchBlock will collect messages. I also need to emit messages if a certain time threshold is reached. Is there an option to specify max messages or a timeout threshold? – Ashish Bhatia Oct 03 '18 at 19:32
  • There's no out of the box timeout, but you could empty it with a timer. There is options for max groups, and capacity which might help your other requirements. – JSteward Oct 03 '18 at 20:09
  • @AshishBhatia why not use Reactive Extensions instead? [Buffer](http://www.introtorx.com/Content/v1.0.10621.0/13_TimeShiftedSequences.html) allows batching by count or timespan, eg `mySequence.Buffer(TimeSpan.FromSeconds(1))`. – Panagiotis Kanavos Oct 05 '18 at 10:23

4 Answers4

8

Buffering by count and duration is already available through the System.Reactive and specifically, the Buffer operator. Buffer collects incoming events until either the desired count is reached or its timespan expires.

Dataflow blocks are designed to work with System.Reactive. Blocks can be converted to Observables and Observers by using the DataflowBlock.AsObservable() and AsObserver() extension methods.

This makes building a buffering block very easy :

public static IPropagatorBlock<TIn,IList<TIn>> CreateBuffer<TIn>(TimeSpan timeSpan,int count)
{
    var inBlock = new BufferBlock<TIn>();
    var outBlock = new BufferBlock<IList<TIn>>();

    var outObserver=outBlock.AsObserver();
    inBlock.AsObservable()
            .Buffer(timeSpan, count)
            .ObserveOn(TaskPoolScheduler.Default)
            .Subscribe(outObserver);

    return DataflowBlock.Encapsulate(inBlock, outBlock);

}

This method uses two buffer blocks to buffer inputs and outputs. Buffer() reads from the input block (the observable) and writes to the output block (the observer) when either the batch is full or the timespan expires.

By default, Rx works on the current thread. By calling ObserveOn(TaskPoolScheduler.Default) we tell it to process data on a Task pool thread.

Example

This code creates a buffer block for 5 items or 1 second. It starts by posting 7 items, waits 1.1 seconds then posts another 7 items. Each batch is written to the console together with the thread ID :

static async Task Main(string[] args)
{
    //Build the pipeline
    var bufferBlock = CreateBuffer<string>(TimeSpan.FromSeconds(1), 5);

    var options = new DataflowLinkOptions { PropagateCompletion = true };
    var printBlock = new ActionBlock<IList<string>>(items=>printOut(items));
    bufferBlock.LinkTo(printBlock, options);

    //Start the messages
    Console.WriteLine($"Starting on {Thread.CurrentThread.ManagedThreadId}");

    for (int i=0;i<7;i++)
    {
        bufferBlock.Post(i.ToString());
    }
    await Task.Delay(1100);
    for (int i=7; i < 14; i++)
    {
        bufferBlock.Post(i.ToString());
    }
    bufferBlock.Complete();
    Console.WriteLine($"Finishing");
    await bufferBlock.Completion;
    Console.WriteLine($"Finished on {Thread.CurrentThread.ManagedThreadId}");
    Console.ReadKey();
}

static void printOut(IEnumerable<string> items)
{
    var line = String.Join(",", items);
    Console.WriteLine($"{line} on {Thread.CurrentThread.ManagedThreadId}");
}

The output is :

Starting on 1
0,1,2,3,4 on 4
5,6 on 8
Finishing
7,8,9,10,11 on 8
12,13 on 6
Finished on 6
Panagiotis Kanavos
  • 104,344
  • 11
  • 159
  • 196
  • Is it possible to wait the outBlock to be empty before writing more data? – Ariel Moraes Nov 23 '21 at 23:54
  • 1
    It's already available. You can specify a bound on any block through the `BoundedCapacity` option and use `await block.SendAsync` to post to it. `SendAsync` will wait asynchronously if the block is full. `Post` will return `false` in the same situation. If you set `BoundedCapacity=1` the method will publish a new buffer only when the previous one is processed – Panagiotis Kanavos Nov 24 '21 at 08:11
3

While there is no out of the box timeout you can wire up a timer to TriggerBatch whenever the downstream pipeline has waited long enough for a batch. Then reset the timer when ever a batch is flowed through. The BatchBlock will take care of the rest for you.

Now for example, this sample has been configure to cause a batch size of 1 everytime even though the batch block would normally be waiting for 10 elements. The timeout forces emptying whatever is currently stored in the BatchBlock

public class BatchBlockExample
{
    [Test]
    public async Task BatchBlockWithTimeOut()
    {
        var batchBlock = new BatchBlock<int>(10);

        var timeOut = TimeSpan.FromSeconds(1);
        var timeOutTimer = new System.Timers.Timer(timeOut.TotalMilliseconds);
        timeOutTimer.Elapsed += (s, e) => batchBlock.TriggerBatch();            

        var actionBlock = new ActionBlock<IEnumerable<int>>(x =>
        {
            //Reset the timeout since we got a batch
            timeOutTimer.Stop();
            timeOutTimer.Start();
            Console.WriteLine($"Batch Size: {x.Count()}");
        });

        batchBlock.LinkTo(actionBlock, new DataflowLinkOptions() { PropagateCompletion = true });
        timeOutTimer.Start();

        foreach(var item in Enumerable.Range(0, 5))
        {
            await Task.Delay(2000);
            await batchBlock.SendAsync(item);
        }

        batchBlock.Complete();
        await actionBlock.Completion;
    }
}

Output:

Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
Batch Size: 1
JSteward
  • 6,396
  • 2
  • 17
  • 28
2

I guess you could use something like this, Basically its just BatchBlock with a Timeout all rolled in to one

BatchBlockEx

public sealed class BatchBlockEx<T> : IDataflowBlock, IPropagatorBlock<T, T[]>, ISourceBlock<T[]>, ITargetBlock<T>, IReceivableSourceBlock<T[]>
{
   private readonly AsyncAutoResetEvent _asyncAutoResetEvent = new AsyncAutoResetEvent();

   private readonly BatchBlock<T> _base;

   private readonly CancellationToken _cancellationToken;

   private readonly int _triggerTimeMs;

   public BatchBlockEx(int batchSize, int triggerTimeMs)
   {
      _triggerTimeMs = triggerTimeMs;
      _base = new BatchBlock<T>(batchSize);
      PollReTrigger();
   }

   public BatchBlockEx(int batchSize, int triggerTimeMs, GroupingDataflowBlockOptions dataflowBlockOptions)
   {
      _triggerTimeMs = triggerTimeMs;
      _cancellationToken = dataflowBlockOptions.CancellationToken;
      _base = new BatchBlock<T>(batchSize, dataflowBlockOptions);
      PollReTrigger();
   }

   public int BatchSize => _base.BatchSize;

   public int OutputCount => _base.OutputCount;

   public Task Completion => _base.Completion;

   public void Complete() => _base.Complete();

   void IDataflowBlock.Fault(Exception exception) => ((IDataflowBlock)_base).Fault(exception);

   public IDisposable LinkTo(ITargetBlock<T[]> target, DataflowLinkOptions linkOptions) => _base.LinkTo(target, linkOptions);

   T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed) => ((ISourceBlock<T[]>)_base).ConsumeMessage(messageHeader, target, out messageConsumed);

   void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReleaseReservation(messageHeader, target);

   bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<T[]> target) => ((ISourceBlock<T[]>)_base).ReserveMessage(messageHeader, target);

   DataflowMessageStatus ITargetBlock<T>.OfferMessage(DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source, bool consumeToAccept)
   {
      _asyncAutoResetEvent.Set();
      return ((ITargetBlock<T>)_base).OfferMessage(messageHeader, messageValue, source, consumeToAccept);
   }

   public bool TryReceive(Predicate<T[]> filter, out T[] item) => _base.TryReceive(filter, out item);

   public bool TryReceiveAll(out IList<T[]> items) => _base.TryReceiveAll(out items);

   public override string ToString() => _base.ToString();

   public void TriggerBatch() => _base.TriggerBatch();

   private void PollReTrigger()
   {
      async Task Poll()
      {
         try
         {
            while (!_cancellationToken.IsCancellationRequested)
            {
               await _asyncAutoResetEvent.WaitAsync()
                                          .ConfigureAwait(false);

               await Task.Delay(_triggerTimeMs, _cancellationToken)
                           .ConfigureAwait(false); 
               TriggerBatch();
            }
         }
         catch (TaskCanceledException)
         {
            // nope
         }
      }

      Task.Run(Poll, _cancellationToken);
   }
}

AsyncAutoResetEvent

public class AsyncAutoResetEvent
{
   private static readonly Task _completed = Task.FromResult(true);
   private readonly Queue<TaskCompletionSource<bool>> _waits = new Queue<TaskCompletionSource<bool>>();
   private bool _signaled;

   public Task WaitAsync()
   {
      lock (_waits)
      {
         if (_signaled)
         {
            _signaled = false;
            return _completed;
         }

         var tcs = new TaskCompletionSource<bool>();
         _waits.Enqueue(tcs);
         return tcs.Task;
      }
   }

   public void Set()
   {
      TaskCompletionSource<bool> toRelease = null;

      lock (_waits)
         if (_waits.Count > 0)
            toRelease = _waits.Dequeue();
         else if (!_signaled)
            _signaled = true;

      toRelease?.SetResult(true);
   }
}
TheGeneral
  • 75,627
  • 8
  • 79
  • 119
1

Here is a slightly different approach. The tricky part of this problem is how to know when the BatchBlock<T> has emitted a batch, in order to deactivate the internal timer. The solution I've chosen is to intercept a TargetWrapper every time a ITargetBlock<T[]> is linked to the BatchBlock<T>, and propagate the batches received by the TargetWrapper to the real target.

The TimeoutBatchBlock<T> class below offers the full range of the BatchBlock<T> functionality. It has all the API and supports all the options. It is a thin wrapper around a BatchBlock<T> instance (plus one TargetWrapper instance for each linked target).

/// <summary>
/// Provides a dataflow block that batches inputs into arrays.
/// A batch is produced when the number of currently queued items becomes equal
/// to batchSize, or when a timeout period has elapsed after receiving the first
/// item in the batch.
/// </summary>
public class TimeoutBatchBlock<T> : IPropagatorBlock<T, T[]>,
    IReceivableSourceBlock<T[]>
{
    private readonly BatchBlock<T> _source;
    private readonly TimeSpan _timeout;
    private readonly Timer _timer;
    private bool _timerEnabled;

    public TimeoutBatchBlock(int batchSize, TimeSpan timeout,
        GroupingDataflowBlockOptions dataflowBlockOptions)
    {
        // Arguments validation omitted
        _source = new BatchBlock<T>(batchSize, dataflowBlockOptions);
        _timeout = timeout;
        _timer = new Timer(_ => _source.TriggerBatch());
        _timerEnabled = false;
    }

    public TimeoutBatchBlock(int batchSize, TimeSpan timeout) : this(batchSize,
        timeout, new GroupingDataflowBlockOptions())
    { }

    public int BatchSize => _source.BatchSize;
    public TimeSpan Timeout => _timeout;
    public Task Completion => _source.Completion;
    public int OutputCount => _source.OutputCount;

    public void Complete() => _source.Complete();

    void IDataflowBlock.Fault(Exception exception)
        => ((IDataflowBlock)_source).Fault(exception);

    public IDisposable LinkTo(ITargetBlock<T[]> target,
        DataflowLinkOptions linkOptions)
    {
        return _source.LinkTo(new TargetWrapper(target, this), linkOptions);
    }

    private class TargetWrapper : ITargetBlock<T[]>
    {
        private readonly ITargetBlock<T[]> _realTarget;
        private readonly TimeoutBatchBlock<T> _parent;

        public TargetWrapper(ITargetBlock<T[]> realTarget, TimeoutBatchBlock<T> parent)
        {
            _realTarget = realTarget;
            _parent = parent;
        }

        public Task Completion => _realTarget.Completion;
        public void Complete() => _realTarget.Complete();
        public void Fault(Exception exception) => _realTarget.Fault(exception);

        public DataflowMessageStatus OfferMessage(DataflowMessageHeader messageHeader,
            T[] messageValue, ISourceBlock<T[]> source, bool consumeToAccept)
        {
            var offerResult = _realTarget.OfferMessage(messageHeader,
                messageValue, source, consumeToAccept);
            if (offerResult == DataflowMessageStatus.Accepted)
                _parent.DeactivateTimerIfActive(); // The block emitted a new batch
            return offerResult;
        }
    }

    public void TriggerBatch() => _source.TriggerBatch();

    public bool TryReceive(Predicate<T[]> filter, out T[] item)
        => _source.TryReceive(filter, out item);

    public bool TryReceiveAll(out IList<T[]> items)
        => _source.TryReceiveAll(out items);

    private void SetTimerState(bool enabled)
    {
        lock (_timer)
        {
            if (enabled == _timerEnabled) return;
            _timer.Change(
                enabled ? _timeout : System.Threading.Timeout.InfiniteTimeSpan,
                System.Threading.Timeout.InfiniteTimeSpan);
            _timerEnabled = enabled;
        }
    }
    private void ActivateTimerIfInactive() => SetTimerState(true);
    private void DeactivateTimerIfActive() => SetTimerState(false);

    DataflowMessageStatus ITargetBlock<T>.OfferMessage(
        DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
        bool consumeToAccept)
    {
        var offerResult = ((ITargetBlock<T>)_source).OfferMessage(messageHeader,
            messageValue, source, consumeToAccept);
        if (offerResult == DataflowMessageStatus.Accepted)
            ActivateTimerIfInactive(); // The block received a new message
        return offerResult;
    }

    T[] ISourceBlock<T[]>.ConsumeMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target, out bool messageConsumed)
            => ((ISourceBlock<T[]>)_source).ConsumeMessage(messageHeader,
                target, out messageConsumed);

    bool ISourceBlock<T[]>.ReserveMessage(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReserveMessage(messageHeader, target);

    void ISourceBlock<T[]>.ReleaseReservation(DataflowMessageHeader messageHeader,
        ITargetBlock<T[]> target)
            => ((ISourceBlock<T[]>)_source).ReleaseReservation(messageHeader, target);
}
Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
  • A custom `BatchBlock` with a timer, that has a slightly different behavior (`BatchUntilInactiveBlock`), can be found [here](https://stackoverflow.com/questions/9419442/how-to-call-triggerbatch-automagically-after-a-timeout-if-the-number-of-queued-i/59172289#59172289). – Theodor Zoulias Sep 12 '21 at 03:20