1

I have an API that has accepts 20 requests per minute, after that, I need to wait for 1 minute before querying it. I have a list of items (usually 1000+) whose details I need to query from the API, my thought was I could use Partitioner to partition my list into 20 items/requests but soon I realized the Partitioner does not work like that, my 2nd thought was adding a delay in the partition but that too is a bad idea, from my understanding it adds a delay after every request which is not needed, instead, I need a delay after every Partition. Below is my code:

public static async Task<IEnumerable<V>> ForEachAsync<T, V>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task<V>> body, CancellationToken token,
    [Optional] int delay)
{
    var whenAll = await Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(degreeOfParallelism)
        select Task.Run(async delegate {
            var allResponses = new List<V>();
            using (partition)
                while (partition.MoveNext())
                {
                    allResponses.Add(await body(partition.Current));
                    await Task.Delay(TimeSpan.FromSeconds(delay));
                }
            return allResponses;
        }, token));
    return whenAll.SelectMany(x => x);
}

Does anyone know how I can accomplish this?

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
Nemo
  • 132
  • 9
  • 1
    Related: [How to execute tasks in parallel but not more than N tasks per T seconds?](https://stackoverflow.com/questions/60208044/how-to-execute-tasks-in-parallel-but-not-more-than-n-tasks-per-t-seconds) – Theodor Zoulias Jan 21 '21 at 11:02
  • 1
    Does the duration of the requests counts towards the requests-per-minute policy? In other words are you allowed to *start* 20 requests per minute (independently of their duration), or you must wait for a minute after the *completion* of the 20 previous requests? – Theodor Zoulias Jan 21 '21 at 11:30
  • @TheodorZoulias no it does not, what matters is; 20 calls per minute. – Nemo Jan 21 '21 at 11:32
  • Related: [Add delay to parallel API call](https://stackoverflow.com/questions/64519475/add-delay-to-parallel-api-call) (Polly) – Theodor Zoulias Jan 21 '21 at 12:20
  • As a side note, be aware that your current implementation of `ForEachAsync` (which is probably a modified version of the last `ForEachAsync` in [this](https://devblogs.microsoft.com/pfxteam/implementing-a-simple-foreachasync-part-2/) article), handles exceptions in a non-ideal way. The reasons are explained in the comments of [this](https://stackoverflow.com/questions/64265723/tasks-combine-result-and-continue/64267212#64267212) answer. – Theodor Zoulias Jan 21 '21 at 13:10
  • @TheodorZoulias I have eliminated the `ForEachAsync`, the `RateLimiter` in your previous comment seems to work, I am currently testing it and will get back. – Nemo Jan 21 '21 at 13:50
  • You mean the `RateLimiter` class from this [answer](https://stackoverflow.com/questions/64519475/add-delay-to-parallel-api-call/64520626#64520626)? That's a fairly complicated piece of code. It's beyond my capabilities to review it and confirm its correctness. – Theodor Zoulias Jan 21 '21 at 14:07
  • Another related question: [Simple way to rate limit HttpClient requests](https://stackoverflow.com/questions/35493925/simple-way-to-rate-limit-httpclient-requests) – Theodor Zoulias Jan 08 '22 at 12:28

1 Answers1

4

Here is a RateLimiter class that you could use in order to limit the frequency of the asynchronous operations. It is a simpler implementation of the RateLimiter class that is found in this answer.

/// <summary>
/// Limits the number of workflows that can access a resource during the
/// specified time span.
/// </summary>
public class RateLimiter : IDisposable
{
    private readonly SemaphoreSlim _semaphore;
    private readonly TimeSpan _timeUnit;
    private readonly CancellationTokenSource _disposeCts;
    private readonly CancellationToken _disposeToken;
    private bool _disposed;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        if (maxActionsPerTimeUnit < 1)
            throw new ArgumentOutOfRangeException(nameof(maxActionsPerTimeUnit));
        if (timeUnit < TimeSpan.Zero || timeUnit.TotalMilliseconds > Int32.MaxValue)
            throw new ArgumentOutOfRangeException(nameof(timeUnit));
        _semaphore = new SemaphoreSlim(maxActionsPerTimeUnit, maxActionsPerTimeUnit);
        _timeUnit = timeUnit;
        _disposeCts = new CancellationTokenSource();
        _disposeToken = _disposeCts.Token;
    }

    public async Task WaitAsync(CancellationToken cancellationToken = default)
    {
        await _semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
        ScheduleSemaphoreRelease();
    }

    private async void ScheduleSemaphoreRelease()
    {
        try { await Task.Delay(_timeUnit, _disposeToken).ConfigureAwait(false); }
        catch (OperationCanceledException) { } // Ignore
        lock (_semaphore) { if (!_disposed) _semaphore.Release(); }
    }

    /// <summary>Call Dispose when you are finished using the RateLimiter.</summary>
    public void Dispose()
    {
        lock (_semaphore)
        {
            if (_disposed) return;
            _semaphore.Dispose();
            _disposed = true;
            _disposeCts.Cancel();
            _disposeCts.Dispose();
        }
    }
}

Usage example:

List<string> urls = GetUrls();

using var rateLimiter = new RateLimiter(20, TimeSpan.FromMinutes(1.0));

string[] documents = await Task.WhenAll(urls.Select(async url =>
{
    await rateLimiter.WaitAsync();
    return await _httpClient.GetStringAsync(url);
}));

Note: I added a Dispose method so that the asynchronous operations initiated internally by the RateLimiter class can be canceled. This method should be called when you are finished using the RateLimiter, otherwise the pending asynchronous operations will prevent the RateLimiter from being garbage collected in a timely manner, on top of consuming resources associated with active Task.Delay tasks. The original very simple but leaky implementation can be found in the 2nd revision of this answer.


I am adding an alternative implementation of the RateLimiter class, more complex, which is based on a Stopwatch instead of a SemaphoreSlim. It has the advantage that it doesn't need to be disposable, since it's not launching hidden asynchronous operations in the background. The disadvantages are that the WaitAsync method does not support a CancellationToken argument, and that the probability of bugs is higher because of the complexity.

public class RateLimiter
{
    private readonly Stopwatch _stopwatch;
    private readonly Queue<TimeSpan> _queue;
    private readonly int _maxActionsPerTimeUnit;
    private readonly TimeSpan _timeUnit;

    public RateLimiter(int maxActionsPerTimeUnit, TimeSpan timeUnit)
    {
        // Arguments validation omitted
        _stopwatch = Stopwatch.StartNew();
        _queue = new Queue<TimeSpan>();
        _maxActionsPerTimeUnit = maxActionsPerTimeUnit;
        _timeUnit = timeUnit;
    }

    public Task WaitAsync()
    {
        var delay = TimeSpan.Zero;
        lock (_stopwatch)
        {
            var currentTimestamp = _stopwatch.Elapsed;
            while (_queue.Count > 0 && _queue.Peek() < currentTimestamp)
            {
                _queue.Dequeue();
            }
            if (_queue.Count >= _maxActionsPerTimeUnit)
            {
                var refTimestamp = _queue
                    .Skip(_queue.Count - _maxActionsPerTimeUnit).First();
                delay = refTimestamp - currentTimestamp;
                Debug.Assert(delay >= TimeSpan.Zero);
                if (delay < TimeSpan.Zero) delay = TimeSpan.Zero; // Just in case
            }
            _queue.Enqueue(currentTimestamp + delay + _timeUnit);
        }
        if (delay == TimeSpan.Zero) return Task.CompletedTask;
        return Task.Delay(delay);
    }
}
Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
  • 1
    This works amazingly well and simplifies my work, no need for partitioning. Thank you. – Nemo Jan 22 '21 at 05:45