35

I'm attempting to figure out an issue that has been raised with my ImageProcessor library here where I am getting intermittent file access errors when adding items to the cache.

System.IO.IOException: The process cannot access the file 'D:\home\site\wwwroot\app_data\cache\0\6\5\f\2\7\065f27fc2c8e843443d210a1e84d1ea28bbab6c4.webp' because it is being used by another process.

I wrote a class designed to perform an asynchronous lock based upon a key generated by a hashed url but it seems I have missed something in the implementation.

My locking class

public sealed class AsyncDuplicateLock
{
    /// <summary>
    /// The collection of semaphore slims.
    /// </summary>
    private static readonly ConcurrentDictionary<object, SemaphoreSlim> SemaphoreSlims
                            = new ConcurrentDictionary<object, SemaphoreSlim>();

    /// <summary>
    /// Locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public IDisposable Lock(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));
        semaphore.Wait();
        return releaser;
    }

    /// <summary>
    /// Asynchronously locks against the given key.
    /// </summary>
    /// <param name="key">
    /// The key that identifies the current object.
    /// </param>
    /// <returns>
    /// The disposable <see cref="Task"/>.
    /// </returns>
    public Task<IDisposable> LockAsync(object key)
    {
        DisposableScope releaser = new DisposableScope(
        key,
        s =>
        {
            SemaphoreSlim locker;
            if (SemaphoreSlims.TryRemove(s, out locker))
            {
                locker.Release();
                locker.Dispose();
            }
        });

        Task<IDisposable> releaserTask = Task.FromResult(releaser as IDisposable);
        SemaphoreSlim semaphore = SemaphoreSlims.GetOrAdd(key, new SemaphoreSlim(1, 1));

        Task waitTask = semaphore.WaitAsync();

        return waitTask.IsCompleted
                   ? releaserTask
                   : waitTask.ContinueWith(
                       (_, r) => (IDisposable)r,
                       releaser,
                       CancellationToken.None,
                       TaskContinuationOptions.ExecuteSynchronously,
                       TaskScheduler.Default);
    }

    /// <summary>
    /// The disposable scope.
    /// </summary>
    private sealed class DisposableScope : IDisposable
    {
        /// <summary>
        /// The key
        /// </summary>
        private readonly object key;

        /// <summary>
        /// The close scope action.
        /// </summary>
        private readonly Action<object> closeScopeAction;

        /// <summary>
        /// Initializes a new instance of the <see cref="DisposableScope"/> class.
        /// </summary>
        /// <param name="key">
        /// The key.
        /// </param>
        /// <param name="closeScopeAction">
        /// The close scope action.
        /// </param>
        public DisposableScope(object key, Action<object> closeScopeAction)
        {
            this.key = key;
            this.closeScopeAction = closeScopeAction;
        }

        /// <summary>
        /// Disposes the scope.
        /// </summary>
        public void Dispose()
        {
            this.closeScopeAction(this.key);
        }
    }
}

Usage - within a HttpModule

private readonly AsyncDuplicateLock locker = new AsyncDuplicateLock();

using (await this.locker.LockAsync(cachedPath))
{
    // Process and save a cached image.
}

Can anyone spot where I have gone wrong? I'm worried that I am misunderstanding something fundamental.

The full source for the library is stored on Github here

James South
  • 9,571
  • 4
  • 56
  • 113

5 Answers5

63

As the other answerer noted, the original code is removing the SemaphoreSlim from the ConcurrentDictionary before it releases the semaphore. So, you've got too much semaphore churn going on - they're being removed from the dictionary when they could still be in use (not acquired, but already retrieved from the dictionary).

The problem with this kind of "mapping lock" is that it's difficult to know when the semaphore is no longer necessary. One option is to never dispose the semaphores at all; that's the easy solution, but may not be acceptable in your scenario. Another option - if the semaphores are actually related to object instances and not values (like strings) - is to attach them using ephemerons; however, I believe this option would also not be acceptable in your scenario.

So, we do it the hard way. :)

There are a few different approaches that would work. I think it makes sense to approach it from a reference-counting perspective (reference-counting each semaphore in the dictionary). Also, we want to make the decrement-count-and-remove operation atomic, so I just use a single lock (making the concurrent dictionary superfluous):

public sealed class AsyncDuplicateLock
{
  private sealed class RefCounted<T>
  {
    public RefCounted(T value)
    {
      RefCount = 1;
      Value = value;
    }

    public int RefCount { get; set; }
    public T Value { get; private set; }
  }

  private static readonly Dictionary<object, RefCounted<SemaphoreSlim>> SemaphoreSlims
                        = new Dictionary<object, RefCounted<SemaphoreSlim>>();

  private SemaphoreSlim GetOrCreate(object key)
  {
    RefCounted<SemaphoreSlim> item;
    lock (SemaphoreSlims)
    {
      if (SemaphoreSlims.TryGetValue(key, out item))
      {
        ++item.RefCount;
      }
      else
      {
        item = new RefCounted<SemaphoreSlim>(new SemaphoreSlim(1, 1));
        SemaphoreSlims[key] = item;
      }
    }
    return item.Value;
  }

  public IDisposable Lock(object key)
  {
    GetOrCreate(key).Wait();
    return new Releaser { Key = key };
  }

  public async Task<IDisposable> LockAsync(object key)
  {
    await GetOrCreate(key).WaitAsync().ConfigureAwait(false);
    return new Releaser { Key = key };
  }

  private sealed class Releaser : IDisposable
  {
    public object Key { get; set; }

    public void Dispose()
    {
      RefCounted<SemaphoreSlim> item;
      lock (SemaphoreSlims)
      {
        item = SemaphoreSlims[Key];
        --item.RefCount;
        if (item.RefCount == 0)
          SemaphoreSlims.Remove(Key);
      }
      item.Value.Release();
    }
  }
}
William
  • 7,808
  • 5
  • 37
  • 42
Stephen Cleary
  • 406,130
  • 70
  • 637
  • 767
  • 1
    "is to attach them using ephemerons", You kind of lost me there, can you explain what you meant by that? – Scott Chamberlain Jul 02 '15 at 21:22
  • 2
    Ephemerons are a dynamic language concept that ties one object to the lifetime of another. Like the properties you can add to `ExpandoObject`, but ephemerons can be attached to any object (more like JavaScript properties in that regard). The only .NET ephemeron is `ConditionalWeakTable`, a difficult to use object. I wrote a simple wrapper library called [ConnectedProperties](https://www.nuget.org/packages/Nito.ConnectedProperties/4.0.0-alpha-1). – Stephen Cleary Jul 02 '15 at 22:18
  • 3
    This is brilliant! An elegant approach that I would have forever over-engineered in attempting. I'd gotten a never disposing implementation working but really wasn't happy with the ever increasing memory usage. Very much appreciated! – James South Jul 03 '15 at 12:45
  • How would you adapt the solution to prevent locking for mostly read use cases? – too May 18 '20 at 19:30
  • 2
    @too: I wouldn't. The time spent under lock is extremely short, so even if most uses are reads, it probably would not be worth changing. – Stephen Cleary May 19 '20 at 00:56
  • Thanks! BTW, why didn't you make `AsyncDuplicateLock` class static? – Olegas Gončarovas Aug 06 '20 at 06:39
  • @OlegasGončarovas: I prefer static instances over static classes, when reasonable. – Stephen Cleary Aug 06 '20 at 17:52
  • Anyone has implemented this with a more unit testable implementation? – Mathieu G Oct 27 '20 at 17:44
  • `SemaphoreSlim` has a built-in `CurrentCount` property so you don't need to `refcount`. Also, we can use `ConcurrentDictionary` to get rid of the `locks`. Otherwise great solution. – Alex from Jitbit Feb 08 '21 at 08:19
  • @StephenCleary, I'm trying to update your solution so that I can allow multiple threads access to a resource, but at the same time the threads that are blocked (till Release is called so that the next thread can enter) to just skip or exit instead of waiting. Do you have any suggestions that I can use? –  Mar 02 '21 at 23:18
  • 1
    @boris: I almost never use "try locks"; to me, this indicates there may be a mismatch somewhere. So the first recommendation I have is to step back and look at the bigger picture, and see if a completely different approach would make better sense. That said, you can do "try locks" with `SemaphoreSlim` - call `WaitAsync(0)`. The trickier part is what API you're going to expose. A nullable `IDisposable` is one option (`null` meaning the lock wasn't taken). – Stephen Cleary Mar 02 '21 at 23:51
  • I've been reading about always locking using a dedicated object, but in this example you seem to lock on the `SemaphoreSlims` dictionary instance. Is it because it's readonly? But it's also static? Would dedicated lock object instance do any good here? – kor_ Mar 05 '21 at 08:40
  • 2
    @kor_: Locking using a dedicated object is never a bad idea. In this specific case, I can lock on the dictionary instance instead because it's private and never exposed. Since this code here is the only code that can access the dictionary instance, I know that nothing else can ever lock on it. – Stephen Cleary Mar 05 '21 at 12:53
  • @AlexfromJitbit No the CurrentCount() does not work, because it does NOT say how many are waiting to get into the Semaphore but how many are currently processed by the Semaphore. – serious Oct 07 '21 at 14:04
  • @StephenCleary I don't understand why there's a "LockAsync" method? "Lock" seems to provide the same functionality, and I can't see what advantage or where you'd use "LockAsync" instead. Am I missing something? – Daniel James Bryars Oct 28 '21 at 05:30
  • @DanielJamesBryars: `LockAsync` is asynchronous, so it doesn't block the thread while acquiring the semaphore. – Stephen Cleary Oct 28 '21 at 11:59
  • Based on this solution, I created a .NET Standard 2.0 library available on NuGet https://www.nuget.org/packages/AsyncKeyedLock and GitHub https://github.com/MarkCiliaVincenti/AsyncKeyedLock – Mark Cilia Vincenti Nov 22 '21 at 09:06
  • I am not sure if keeping SemaphoreSlims dictionary field as static is good idea. In an application when someone might keep multiple instances of AsyncDuplicateLock completely independent keys can cause collisions. Why not keep it a regular instance variable and let client handle AsyncDuplicateLock lifetime (for example keeping multiple separate instances of AsyncDuplicateLock static)? – LadislavBohm Nov 23 '21 at 10:25
  • 1
    @LadislavBohm: The op's question used a `static readonly` dictionary, so my answer mirrored that. – Stephen Cleary Nov 23 '21 at 22:24
  • I stumbled upon this question, after implementing a *very* similar solution myself: https://github.com/amoerie/keyed-semaphores The main index of semaphores is based on ConcurrentDictionary, but the ref counting happens under a lock, where the lock is the semaphore itself. This avoids lock contention until a lot of requests for the same key occur. I'd be interested to hear feedback! – Moeri Dec 02 '21 at 15:41
  • @LadislavBohm I updated the library at https://www.nuget.org/packages/AsyncKeyedLock / https://github.com/MarkCiliaVincenti/AsyncKeyedLock to not use static. An instance can also be injected. – Mark Cilia Vincenti Mar 12 '22 at 10:15
  • Would adding a cancellation token be okay here? – sommmen Mar 28 '22 at 14:42
1

For a given key,

  1. Thread 1 calls GetOrAdd and adds a new semaphore and acquires it via Wait
  2. Thread 2 calls GetOrAdd and gets the existing semaphore and blocks on Wait
  3. Thread 1 releases the semaphore, only after having called TryRemove, which removed the semaphore from the dictionary
  4. Thread 2 now acquires the semaphore.
  5. Thread 3 calls GetOrAdd for the same key as thread 1 and 2. Thread 2 is still holding the semaphore, but the semaphore is not in the dictionary, so thread 3 creates a new semaphore and both threads 2 and 3 access the same protected resource.

You need to adjust your logic. The semaphore should only be removed from the dictionary when it has no waiters.

Here is one potential solution, minus the async part:

public sealed class AsyncDuplicateLock
{
    private class LockInfo
    {
        private SemaphoreSlim sem;
        private int waiterCount;

        public LockInfo()
        {
            sem = null;
            waiterCount = 1;
        }

        // Lazily create the semaphore
        private SemaphoreSlim Semaphore
        {
            get
            {
                var s = sem;
                if (s == null)
                {
                    s = new SemaphoreSlim(0, 1);
                    var original = Interlocked.CompareExchange(ref sem, null, s);
                    // If someone else already created a semaphore, return that one
                    if (original != null)
                        return original;
                }
                return s;
            }
        }

        // Returns true if successful
        public bool Enter()
        {
            if (Interlocked.Increment(ref waiterCount) > 1)
            {
                Semaphore.Wait();
                return true;
            }
            return false;
        }

        // Returns true if this lock info is now ready for removal
        public bool Exit()
        {
            if (Interlocked.Decrement(ref waiterCount) <= 0)
                return true;

            // There was another waiter
            Semaphore.Release();
            return false;
        }
    }

    private static readonly ConcurrentDictionary<object, LockInfo> activeLocks = new ConcurrentDictionary<object, LockInfo>();

    public static IDisposable Lock(object key)
    {
        // Get the current info or create a new one
        var info = activeLocks.AddOrUpdate(key,
          (k) => new LockInfo(),
          (k, v) => v.Enter() ? v : new LockInfo());

        DisposableScope releaser = new DisposableScope(() =>
        {
            if (info.Exit())
            {
                // Only remove this exact info, in case another thread has
                // already put its own info into the dictionary
                ((ICollection<KeyValuePair<object, LockInfo>>)activeLocks)
                  .Remove(new KeyValuePair<object, LockInfo>(key, info));
            }
        });

        return releaser;
    }

    private sealed class DisposableScope : IDisposable
    {
        private readonly Action closeScopeAction;

        public DisposableScope(Action closeScopeAction)
        {
            this.closeScopeAction = closeScopeAction;
        }

        public void Dispose()
        {
            this.closeScopeAction();
        }
    }
}
Dark Falcon
  • 42,395
  • 5
  • 80
  • 94
  • Thanks. I get the explanation but now I'm struggling to figure out what property to test against to ensure it has no waiters. https://msdn.microsoft.com/en-us/library/system.threading.semaphoreslim_properties(v=vs.110).aspx – James South Jun 30 '15 at 12:53
  • Thanks... Isn't that synchronous only though? – James South Jun 30 '15 at 19:14
  • Yes, I even said that. The asynchronous is up to you to design. It shouldn't be that hard, as it is based on a `SemaphoreSlim` just like your original code. – Dark Falcon Jun 30 '15 at 21:18
  • Ok so on closer inspection of the code I'm not convinced I can get this to work. The lock method can't be static and I can't see how I would expose `semaphore.WaitAsync()` in order to return the result asyncronously. – James South Jul 01 '15 at 09:42
1

Here is a KeyedLock class that is less convenient and more error prone, but also less allocatey than Stephen Cleary's AsyncDuplicateLock. It maintains internally a pool of SemaphoreSlims, that can be reused by any key after they are released by the previous key. The capacity of the pool is configurable, and by default is 10.

This class is not allocation-free, because the SemaphoreSlim class allocates memory (quite a lot actually) every time the semaphore cannot be acquired synchronously because of contention.

The lock can be requested both synchronously and asynchronously, and can also be requested with cancellation and timeout. These features are provided by exploiting the existing functionality of the SemaphoreSlim class.

public class KeyedLock<TKey>
{
    private readonly Dictionary<TKey, (SemaphoreSlim, int)> _perKey;
    private readonly Stack<SemaphoreSlim> _pool;
    private readonly int _poolCapacity;

    public KeyedLock(IEqualityComparer<TKey> keyComparer = null, int poolCapacity = 10)
    {
        _perKey = new Dictionary<TKey, (SemaphoreSlim, int)>(keyComparer);
        _pool = new Stack<SemaphoreSlim>(poolCapacity);
        _poolCapacity = poolCapacity;
    }

    public async Task<bool> WaitAsync(TKey key, int millisecondsTimeout,
        CancellationToken cancellationToken = default)
    {
        var semaphore = GetSemaphore(key);
        bool entered = false;
        try
        {
            entered = await semaphore.WaitAsync(millisecondsTimeout,
                cancellationToken).ConfigureAwait(false);
        }
        finally { if (!entered) ReleaseSemaphore(key, entered: false); }
        return entered;
    }

    public Task WaitAsync(TKey key, CancellationToken cancellationToken = default)
        => WaitAsync(key, Timeout.Infinite, cancellationToken);

    public bool Wait(TKey key, int millisecondsTimeout,
        CancellationToken cancellationToken = default)
    {
        var semaphore = GetSemaphore(key);
        bool entered = false;
        try { entered = semaphore.Wait(millisecondsTimeout, cancellationToken); }
        finally { if (!entered) ReleaseSemaphore(key, entered: false); }
        return entered;
    }

    public void Wait(TKey key, CancellationToken cancellationToken = default)
        => Wait(key, Timeout.Infinite, cancellationToken);

    public void Release(TKey key) => ReleaseSemaphore(key, entered: true);

    private SemaphoreSlim GetSemaphore(TKey key)
    {
        SemaphoreSlim semaphore;
        lock (_perKey)
        {
            if (_perKey.TryGetValue(key, out var entry))
            {
                int counter;
                (semaphore, counter) = entry;
                _perKey[key] = (semaphore, ++counter);
            }
            else
            {
                lock (_pool) semaphore = _pool.Count > 0 ? _pool.Pop() : null;
                if (semaphore == null) semaphore = new SemaphoreSlim(1, 1);
                _perKey[key] = (semaphore, 1);
            }
        }
        return semaphore;
    }

    private void ReleaseSemaphore(TKey key, bool entered)
    {
        SemaphoreSlim semaphore; int counter;
        lock (_perKey)
        {
            if (_perKey.TryGetValue(key, out var entry))
            {
                (semaphore, counter) = entry;
                counter--;
                if (counter == 0)
                    _perKey.Remove(key);
                else
                    _perKey[key] = (semaphore, counter);
            }
            else
            {
                throw new InvalidOperationException("Key not found.");
            }
        }
        if (entered) semaphore.Release();
        if (counter == 0)
        {
            Debug.Assert(semaphore.CurrentCount == 1);
            lock (_pool) if (_pool.Count < _poolCapacity) _pool.Push(semaphore);
        }
    }
}

Usage example:

var locker = new KeyedLock<string>();

await locker.WaitAsync("Hello");
try
{
    await DoSomethingAsync();
}
finally
{
    locker.Release("Hello");
}

The implementation uses tuple deconstruction, that requires at least C# 7.

The KeyedLock class could be easily modified to become a KeyedSemaphore, that would allow more than one concurrent operations per key. It would just need a maximumConcurrencyPerKey parameter in the constructor, that would be stored and passed to the constructor of the SemaphoreSlims.


Note: The SemaphoreSlim class when misused it throws a SemaphoreFullException. This happens when the semaphore is released more times than it has been acquired. The KeyedLock implementation of this answer behaves differently in case of misuse: it throws an InvalidOperationException("Key not found."). This happens because when a key is released as many times as it has been acquired, the associated semaphore is removed from the dictionary. If this implementation ever throw a SemaphoreFullException, it would be an indication of a bug.

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
0

I rewrote the @StephenCleary answer with this:

public sealed class AsyncLockList {

    readonly Dictionary<object, SemaphoreReferenceCount> Semaphores = new Dictionary<object, SemaphoreReferenceCount>();

    SemaphoreSlim GetOrCreateSemaphore(object key) {
        lock (Semaphores) {
            if (Semaphores.TryGetValue(key, out var item)) {
                item.IncrementCount();
            } else {
                item = new SemaphoreReferenceCount();
                Semaphores[key] = item;
            }
            return item.Semaphore;
        }
    }

    public IDisposable Lock(object key) {
        GetOrCreateSemaphore(key).Wait();
        return new Releaser(Semaphores, key);
    }

    public async Task<IDisposable> LockAsync(object key) {
        await GetOrCreateSemaphore(key).WaitAsync().ConfigureAwait(false);
        return new Releaser(Semaphores, key);
    }

    sealed class SemaphoreReferenceCount {
        public readonly SemaphoreSlim Semaphore = new SemaphoreSlim(1, 1);
        public int Count { get; private set; } = 1;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void IncrementCount() => Count++;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public void DecrementCount() => Count--;
    }

    sealed class Releaser : IDisposable {
        readonly Dictionary<object, SemaphoreReferenceCount> Semaphores;
        readonly object Key;

        [MethodImpl(MethodImplOptions.AggressiveInlining)]
        public Releaser(Dictionary<object, SemaphoreReferenceCount> semaphores, object key) {
            Semaphores = semaphores;
            Key = key;
        }

        public void Dispose() {
            lock (Semaphores) {
                var item = Semaphores[Key];
                item.DecrementCount();
                if (item.Count == 0)
                    Semaphores.Remove(Key);
                item.Semaphore.Release();
            }
        }
    }
}
bboyle1234
  • 4,719
  • 2
  • 23
  • 26
  • 1
    @stephencleary, I'm more comfortable with the code rewritten this way, can you comment on any possible significant inefficiencies I may have introduced? – bboyle1234 Aug 28 '18 at 05:46
  • I copy-paste your answer in my solution. I liked more how you rewrited your code. However, it doesn't work. I can't tell you why but no lock happenes for the same key. Dark Falcon answer worked out of the box! – Mathieu G Sep 04 '19 at 03:56
  • I meant @Stephen Cleary – Mathieu G Oct 26 '20 at 20:59
0

Inspired by this previous answer, here is a version that supports async wait:

    public class KeyedLock<TKey>
    {
        private readonly ConcurrentDictionary<TKey, LockInfo> _locks = new();

        public int Count => _locks.Count;

        public async Task<IDisposable> WaitAsync(TKey key, CancellationToken cancellationToken = default)
        {
            // Get the current info or create a new one.
            var info = _locks.AddOrUpdate(key,
                // Add
                k => new LockInfo(),
                // Update
                (k, v) => v.Enter() ? v : new LockInfo());

            try
            {
                await info.Semaphore.WaitAsync(cancellationToken);

                return new Releaser(() => Release(key, info, true));
            }
            catch (OperationCanceledException)
            {
                // The semaphore wait was cancelled, release the lock.
                Release(key, info, false);
                throw;
            }
        }

        private void Release(TKey key, LockInfo info, bool isCurrentlyLocked)
        {
            if (info.Leave())
            {
                // This was the last lock for the key.

                // Only remove this exact info, in case another thread has
                // already put its own info into the dictionary
                // Note that this call to Remove(entry) is in fact thread safe.
                var entry = new KeyValuePair<TKey, LockInfo>(key, info);
                if (((ICollection<KeyValuePair<TKey, LockInfo>>)_locks).Remove(entry))
                {
                    // This exact info was removed.
                    info.Dispose();
                }
            }
            else if (isCurrentlyLocked)
            {
                // There is another waiter.
                info.Semaphore.Release();
            }
        }

        private class LockInfo : IDisposable
        {
            private SemaphoreSlim _semaphore = null;
            private int _refCount = 1;

            public SemaphoreSlim Semaphore
            {
                get
                {
                    // Lazily create the semaphore.
                    var s = _semaphore;
                    if (s is null)
                    {
                        s = new SemaphoreSlim(1, 1);

                        // Assign _semaphore if its current value is null.
                        var original = Interlocked.CompareExchange(ref _semaphore, s, null);

                        // If someone else already created a semaphore, return that one
                        if (original is not null)
                        {
                            s.Dispose();
                            return original;
                        }
                    }
                    return s;
                }
            }

            // Returns true if successful
            public bool Enter()
            {
                if (Interlocked.Increment(ref _refCount) > 1)
                {
                    return true;
                }

                // This lock info is not valid anymore - its semaphore is or will be disposed.
                return false;
            }

            // Returns true if this lock info is now ready for removal
            public bool Leave()
            {
                if (Interlocked.Decrement(ref _refCount) <= 0)
                {
                    // This was the last lock
                    return true;
                }

                // There is another waiter
                return false;
            }

            public void Dispose() => _semaphore?.Dispose();
        }

        private sealed class Releaser : IDisposable
        {
            private readonly Action _dispose;

            public Releaser(Action dispose) => _dispose = dispose;

            public void Dispose() => _dispose();
        }
    }
rmja
  • 1
  • 1