Here is a class PrioritySemaphore<TPriority> that can be acquired with priority. Internally it is based on the SortedSet collection.
public class PrioritySemaphore<TPriority>
{
private readonly PriorityQueue _priorityQueue;
private readonly object _locker = new object();
private readonly int _maxCount;
private int _currentCount;
private long _indexSeed = 0;
public PrioritySemaphore(int initialCount, int maxCount,
IComparer<TPriority> comparer = null)
{
if (initialCount < 0)
throw new ArgumentOutOfRangeException(nameof(initialCount));
if (maxCount <= 0) throw new ArgumentOutOfRangeException(nameof(maxCount));
_priorityQueue = new PriorityQueue(comparer);
_currentCount = initialCount;
_maxCount = maxCount;
}
public PrioritySemaphore(int initialCount, IComparer<TPriority> comparer = null)
: this(initialCount, Int32.MaxValue, comparer) { }
public PrioritySemaphore(IComparer<TPriority> comparer = null)
: this(0, Int32.MaxValue, comparer) { }
public int CurrentCount { get { lock (_locker) return _currentCount; } }
public async Task<bool> WaitAsync(TPriority priority, int millisecondsTimeout,
CancellationToken cancellationToken = default)
{
if (millisecondsTimeout < -1)
throw new ArgumentOutOfRangeException(nameof(millisecondsTimeout));
cancellationToken.ThrowIfCancellationRequested();
lock (_locker)
{
if (_currentCount > 0)
{
_currentCount--;
return true;
}
}
if (millisecondsTimeout == 0) return false;
var tcs = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
long entryIndex = -1;
bool taskCompleted = false;
Timer timer = null;
if (millisecondsTimeout > 0)
{
timer = new Timer(_ =>
{
bool doComplete;
lock (_locker)
{
doComplete = entryIndex == -1
|| _priorityQueue.Remove(priority, entryIndex);
if (doComplete) taskCompleted = true;
}
if (doComplete) tcs.TrySetResult(false);
}, null, millisecondsTimeout, Timeout.Infinite);
}
CancellationTokenRegistration registration = default;
if (cancellationToken.CanBeCanceled)
{
registration = cancellationToken.Register(() =>
{
bool doComplete;
lock (_locker)
{
doComplete = entryIndex == -1
|| _priorityQueue.Remove(priority, entryIndex);
if (doComplete) taskCompleted = true;
}
if (doComplete) tcs.TrySetCanceled(cancellationToken);
});
}
bool disposeSubscriptions = false;
lock (_locker)
{
if (!taskCompleted)
{
entryIndex = _indexSeed++;
_priorityQueue.Enqueue(priority, entryIndex, tcs, timer, registration);
}
else
{
disposeSubscriptions = true;
}
}
if (disposeSubscriptions)
{
timer?.Dispose();
registration.Dispose();
}
return await tcs.Task.ConfigureAwait(false);
}
public Task WaitAsync(TPriority priority,
CancellationToken cancellationToken = default)
{
return WaitAsync(priority, Timeout.Infinite, cancellationToken);
}
public void Release()
{
TaskCompletionSource<bool> tcs;
Timer timer;
CancellationTokenRegistration registration;
lock (_locker)
{
if (_priorityQueue.IsEmpty)
{
if (_currentCount >= _maxCount) throw new SemaphoreFullException();
_currentCount++;
return;
}
(tcs, timer, registration) = _priorityQueue.Dequeue();
}
tcs.TrySetResult(true);
timer?.Dispose();
registration.Dispose();
}
private class PriorityQueue : IComparer<(TPriority Priority, long Index,
TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)>
{
private readonly SortedSet<(TPriority Priority, long Index,
TaskCompletionSource<bool> TCS, Timer Timer,
CancellationTokenRegistration Registration)> _sortedSet;
private readonly IComparer<TPriority> _priorityComparer;
private readonly Comparer<long> _indexComparer = Comparer<long>.Default;
public PriorityQueue(IComparer<TPriority> comparer)
{
_priorityComparer = comparer ?? Comparer<TPriority>.Default;
_sortedSet = new SortedSet<(TPriority Priority, long Index,
TaskCompletionSource<bool> TCS, Timer Timer,
CancellationTokenRegistration Registration)>(this);
}
public bool IsEmpty => _sortedSet.Count == 0;
public void Enqueue(TPriority priority, long index,
TaskCompletionSource<bool> tcs, Timer timer,
CancellationTokenRegistration registration)
{
_sortedSet.Add((priority, index, tcs, timer, registration));
}
public (TaskCompletionSource<bool>, Timer, CancellationTokenRegistration)
Dequeue()
{
Debug.Assert(_sortedSet.Count > 0);
var entry = _sortedSet.Min;
_sortedSet.Remove(entry);
return (entry.TCS, entry.Timer, entry.Registration);
}
public bool Remove(TPriority priority, long index)
{
return _sortedSet.Remove((priority, index, default, default, default));
}
public int Compare((TPriority Priority, long Index,
TaskCompletionSource<bool>, Timer, CancellationTokenRegistration) x,
(TPriority Priority, long Index, TaskCompletionSource<bool>, Timer,
CancellationTokenRegistration) y)
{
int result = _priorityComparer.Compare(x.Priority, y.Priority);
if (result == 0) result = _indexComparer.Compare(x.Index, y.Index);
return result;
}
}
}
Usage example:
var semaphore = new PrioritySemaphore<int>();
//...
await semaphore.WaitAsync(priority: 1);
//...
await semaphore.WaitAsync(priority: 2);
//...
semaphore.Release();
After the Release, the semaphore will be acquired by the awaiter with the highest priority. In the above example it will be the awaiter with priority 1. Smaller values denote higher priority. If there are more than one awaiters with the same highest priority, the semaphore will be acquired by the one that requested it first (FIFO order is maintained).
The class PrioritySemaphore<TPriority> has only asynchronous API. It supports awaiting with timeout and with CancellationToken, but these features have not been tested extensively.
Note: The .NET 6 introduced the PriorityQueue<TElement, TPriority> class, which theoretically could be used to simplify the above implementation. Unfortunately the new class does not support removing specific elements from the queue. Only dequeuing is supported. And in order to implement the cancellation and timeout functionality of the PrioritySemaphore<TPriority> class, removing specific elements from the queue is required. So the new class cannot be used in the above implementation.