4

I am wondering if SemaphoreSlim has anything like a priority when calling Await.

I have not been able to find anything, but maybe someone has done something like this before.

The idea is, that if I need to, an await can be called on the semaphore later on with a higher priority, and it will allow the await to return first.

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
JonathanPeel
  • 703
  • 1
  • 7
  • 17
  • 1
    There is no priority. Whatever horse gets to the finishing line first wins the race. In the very unlikely case it is a tie (unlikely because you don't use WaitAll) the operating system will intentionally make the winner random, a counter-measure against lock convoys. http://joeduffyblog.com/2006/12/14/anticonvoy-locks-in-windows-server-2003-sp1-and-windows-vista/ – Hans Passant Sep 13 '16 at 16:05
  • Thank you. I thought it might be something like that. I could probably try and write something to handle what I want, but I don't think that would be a very good idea. Maybe by chance someone else will have already done something, but I am thinking of redoing some of the code. – JonathanPeel Sep 13 '16 at 16:07

2 Answers2

4

No, there are no priorities in SemaphoreSlim, whether you're using synchronous or asynchronous locking.

There is very rarely ever a need for priorities with asynchronous locks. Usually these kinds of problems have more elegant solutions if you take a step back and look at the bigger picture.

Stephen Cleary
  • 406,130
  • 70
  • 637
  • 767
3

Here is a class PrioritySemaphore<TPriority> that can be acquired with priority. Internally it is based on the SortedSet collection.

public class PrioritySemaphore<TPriority>
{
    private readonly PriorityQueue _priorityQueue;
    private readonly object _locker = new object();
    private readonly int _maxCount;
    private int _currentCount;
    private long _indexSeed = 0;

    public PrioritySemaphore(int initialCount, int maxCount,
        IComparer<TPriority> comparer = null)
    {
        if (initialCount < 0)
            throw new ArgumentOutOfRangeException(nameof(initialCount));
        if (maxCount <= 0) throw new ArgumentOutOfRangeException(nameof(maxCount));

        _priorityQueue = new PriorityQueue(comparer);
        _currentCount = initialCount;
        _maxCount = maxCount;
    }
    public PrioritySemaphore(int initialCount, IComparer<TPriority> comparer = null)
        : this(initialCount, Int32.MaxValue, comparer) { }
    public PrioritySemaphore(IComparer<TPriority> comparer = null)
        : this(0, Int32.MaxValue, comparer) { }

    public int CurrentCount { get { lock (_locker) return _currentCount; } }

    public async Task<bool> WaitAsync(TPriority priority, int millisecondsTimeout,
        CancellationToken cancellationToken = default)
    {
        if (millisecondsTimeout < -1)
            throw new ArgumentOutOfRangeException(nameof(millisecondsTimeout));

        cancellationToken.ThrowIfCancellationRequested();
        lock (_locker)
        {
            if (_currentCount > 0)
            {
                _currentCount--;
                return true;
            }
        }
        if (millisecondsTimeout == 0) return false;
        var tcs = new TaskCompletionSource<bool>(
            TaskCreationOptions.RunContinuationsAsynchronously);
        long entryIndex = -1;
        bool taskCompleted = false;

        Timer timer = null;
        if (millisecondsTimeout > 0)
        {
            timer = new Timer(_ =>
            {
                bool doComplete;
                lock (_locker)
                {
                    doComplete = entryIndex == -1
                        || _priorityQueue.Remove(priority, entryIndex);
                    if (doComplete) taskCompleted = true;
                }
                if (doComplete) tcs.TrySetResult(false);
            }, null, millisecondsTimeout, Timeout.Infinite);
        }

        CancellationTokenRegistration registration = default;
        if (cancellationToken.CanBeCanceled)
        {
            registration = cancellationToken.Register(() =>
            {
                bool doComplete;
                lock (_locker)
                {
                    doComplete = entryIndex == -1
                        || _priorityQueue.Remove(priority, entryIndex);
                    if (doComplete) taskCompleted = true;
                }
                if (doComplete) tcs.TrySetCanceled(cancellationToken);
            });
        }

        bool disposeSubscriptions = false;
        lock (_locker)
        {
            if (!taskCompleted)
            {
                entryIndex = _indexSeed++;
                _priorityQueue.Enqueue(priority, entryIndex, tcs, timer, registration);
            }
            else
            {
                disposeSubscriptions = true;
            }
        }
        if (disposeSubscriptions)
        {
            timer?.Dispose();
            registration.Dispose();
        }
        return await tcs.Task.ConfigureAwait(false);
    }

    public Task WaitAsync(TPriority priority,
        CancellationToken cancellationToken = default)
    {
        return WaitAsync(priority, Timeout.Infinite, cancellationToken);
    }

    public void Release()
    {
        TaskCompletionSource<bool> tcs;
        Timer timer;
        CancellationTokenRegistration registration;
        lock (_locker)
        {
            if (_priorityQueue.IsEmpty)
            {
                if (_currentCount >= _maxCount) throw new SemaphoreFullException();
                _currentCount++;
                return;
            }
            (tcs, timer, registration) = _priorityQueue.Dequeue();
        }
        tcs.TrySetResult(true);
        timer?.Dispose();
        registration.Dispose();
    }

    private class PriorityQueue : IComparer<(TPriority Priority, long Index,
        TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)>
    {
        private readonly SortedSet<(TPriority Priority, long Index,
            TaskCompletionSource<bool> TCS, Timer Timer,
            CancellationTokenRegistration Registration)> _sortedSet;

        private readonly IComparer<TPriority> _priorityComparer;
        private readonly Comparer<long> _indexComparer = Comparer<long>.Default;

        public PriorityQueue(IComparer<TPriority> comparer)
        {
            _priorityComparer = comparer ?? Comparer<TPriority>.Default;
            _sortedSet = new SortedSet<(TPriority Priority, long Index,
            TaskCompletionSource<bool> TCS, Timer Timer,
            CancellationTokenRegistration Registration)>(this);
        }

        public bool IsEmpty => _sortedSet.Count == 0;

        public void Enqueue(TPriority priority, long index,
            TaskCompletionSource<bool> tcs, Timer timer,
            CancellationTokenRegistration registration)
        {
            _sortedSet.Add((priority, index, tcs, timer, registration));
        }

        public (TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)
            Dequeue()
        {
            Debug.Assert(_sortedSet.Count > 0);
            var entry = _sortedSet.Min;
            _sortedSet.Remove(entry);
            return (entry.TCS, entry.Timer, entry.Registration);
        }

        public bool Remove(TPriority priority, long index)
        {
            return _sortedSet.Remove((priority, index, default, default, default));
        }

        public int Compare((TPriority Priority, long Index,
            TaskCompletionSource<bool>, Timer, CancellationTokenRegistration) x,
            (TPriority Priority, long Index, TaskCompletionSource<bool>, Timer,
            CancellationTokenRegistration) y)
        {
            int result = _priorityComparer.Compare(x.Priority, y.Priority);
            if (result == 0) result = _indexComparer.Compare(x.Index, y.Index);
            return result;
        }
    }
}

Usage example:

var semaphore = new PrioritySemaphore<int>();
//...
await semaphore.WaitAsync(priority: 1);
//...
await semaphore.WaitAsync(priority: 2);
//...
semaphore.Release();

After the Release, the semaphore will be acquired by the awaiter with the highest priority. In the above example it will be the awaiter with priority 1. Smaller values denote higher priority. If there are more than one awaiters with the same highest priority, the semaphore will be acquired by the one that requested it first (FIFO order is maintained).

The class PrioritySemaphore<TPriority> has only asynchronous API. It supports awaiting with timeout and with CancellationToken, but these features have not been tested extensively.


Note: The .NET 6 introduced the PriorityQueue<TElement, TPriority> class, which theoretically could be used to simplify the above implementation. Unfortunately the new class does not support removing specific elements from the queue. Only dequeuing is supported. And in order to implement the cancellation and timeout functionality of the PrioritySemaphore<TPriority> class, removing specific elements from the queue is required. So the new class cannot be used in the above implementation.

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69