33

I want to use something like GetOrAdd with a ConcurrentDictionary as a cache to a webservice. Is there an async version of this dictionary? GetOrAdd will be making a web request using HttpClient, so it would be nice if there was a version of this dictionary where GetOrAdd was async.

To clear up some confusion, the contents of the dictionary will be the response from a call to a webservice.

ConcurrentDictionary<string, Response> _cache
    = new ConcurrentDictionary<string, Response>();

var response = _cache.GetOrAdd("id",
    (x) => { _httpClient.GetAsync(x).GetAwaiter().GetResponse(); });
Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
Zeus82
  • 5,485
  • 8
  • 46
  • 75
  • 3
    For me it sounds like async `GetOrAdd` doesn't make much sense. This method can only be executed synchronously. – Yeldar Kurmangaliyev Jan 09 '19 at 20:17
  • 8
    Adding to a dictionary is not an IO bound operation, it won't make sense to have async version of it. – JohanP Jan 09 '19 at 20:18
  • 2
    If you need to await something I'd suggest checking if the key is in the dictionary, and if not then await the Http call then call `GeOrAdd` with the result. Ultimately you'll have to check again in case something else inserted the key while you were waiting on the IO. – juharr Jan 09 '19 at 20:25
  • @juharr: That's exactly what `ConcurrentDictionary` does. It start by checking, then generates a new value and then check AGAIN before it's added. – Poul Bak Jun 08 '21 at 20:04

5 Answers5

31

GetOrAdd won't become an asynchronous operation because accessing the value of a dictionary isn't a long running operation.

What you can do however is simply store tasks in the dictionary, rather than the materialized result. Anyone needing the results can then await that task.

However, you also need to ensure that the operation is only ever started once, and not multiple times. To ensure that some operation runs only once, and not multiple times, you also need to add in Lazy:

ConcurrentDictionary<string, Lazy<Task<Response>>> _cache = new ConcurrentDictionary<string, Lazy<Task<Response>>>();

var response = await _cache.GetOrAdd("id", url => new Lazy<Task<Response>>(_httpClient.GetAsync(url))).Value;
Yepeekai
  • 2,265
  • 25
  • 21
Servy
  • 197,813
  • 25
  • 319
  • 428
  • This puts an incomplete `Task` in the cache. What happens if the `Task` faults or is cancelled? The task represents an HTTP request to a remote resource, chance of it failing is not negligible. – odyss-jii Jan 09 '19 at 21:11
  • @odyss-jii Yes, they would need to handle the error case and that would most likely involve removing it from the cache. – Servy Jan 09 '19 at 21:38
  • 2
    That is absolutely horrible design for a cache. It breaks the abstraction completely. If I fetch a value from a subsystem it is not my responsibility to clean-up its internal cache because it has a broken implementation. – odyss-jii Jan 10 '19 at 07:01
  • 1
    It doesn't need to be the end consumer of the cache that handles it, it can be the wrapper around this code that the OP writes. The code in this answer is not a completed production ready fully featured cache. It shows how to solve the question asked, which the OP will need to finish in their own wrapping cache to make it production worthy code. Just like how your answer has problems making it not completed production ready code, but rather just a solution to the question asked about. – Servy Jan 10 '19 at 14:18
  • I feel like the Lazy is redundant. Executing something like `_httpClient.GetAsync(url)` will return the Task immediately. – Darragh Jun 28 '20 at 21:16
  • 1
    @Darragh But then you're performing the operation more than once. That's very often not acceptable. The Lazy doesn't ensure the operation returns any faster, it ensures it never runs more than once. – Servy Jun 29 '20 at 00:33
12

The GetOrAdd method is not that great to use for this purpose. Since it does not guarantee that the factory runs only once, the only purpose it has is a minor optimization (minor since additions are rare anyway) in that it doesn't need to hash and find the correct bucket twice (which would happen twice if you get and set with two separate calls).

I would suggest that you check the cache first, if you do not find the value in the cache, then enter some form of critical section (lock, semaphore, etc.), re-check the cache, if still missing then fetch the value and insert into the cache.

This ensures that your backing store is only hit once; even if multiple requests get a cache miss at the same time, only the first one will actually fetch the value, the other requests will await the semaphore and then return early since they re-check the cache in the critical section.

Psuedo code (using SemaphoreSlim with count of 1, since you can await it asynchronously):

async Task<TResult> GetAsync(TKey key)
{
    // Try to fetch from catch
    if (cache.TryGetValue(key, out var result)) return result;

    // Get some resource lock here, for example use SemaphoreSlim 
    // which has async wait function:
    await semaphore.WaitAsync();    
    try 
    {
        // Try to fetch from cache again now that we have entered 
        // the critical section
        if (cache.TryGetValue(key, out result)) return result;

        // Fetch data from source (using your HttpClient or whatever), 
        // update your cache and return.
        return cache[key] = await FetchFromSourceAsync(...);
    }
    finally
    {
        semaphore.Release();
    }
}
odyss-jii
  • 2,514
  • 14
  • 21
  • If you're going to explicitly lock then you need to explicitly lock everywhere else that uses this collection as well, in order to ensure the operation is logically atomic. – Servy Jan 09 '19 at 20:45
  • The collection is ConcurrentDictionary, the collection itself is thread-safe. You are locking for a different reason here. – odyss-jii Jan 09 '19 at 20:46
  • 1
    The collection won't throw some sort of index out of bounds exception or return garbage data, because it's designed to be used from multiple threads, but you're now trying to perform multiple operations from it in sequence, and are relying on no changes to the collection during that time, which it won't provide for you. You'll need to explicitly lock not just here, but *everywhere* using the collection to ensure that someone else doesn't add the value after you found it missing, or anything like that. – Servy Jan 09 '19 at 20:52
  • I am not sure which hypothetical scenario you are thinking of, but it does not apply to this particular case. This is a fetch from a source with an in-memory cache, it does not matter if the collection changes during that time. The purpose of the lock is to protect the source from a surge if there are several concurrent cache misses; the purpose is not to synchronize access to the collection. – odyss-jii Jan 09 '19 at 20:59
  • 2
    Yes, it *does* matter if the collection changes. It can result in performing work multiple times that's not supposed to be repeated, for example. – Servy Jan 09 '19 at 21:40
6

Try this extension method:

/// <summary>
/// Adds a key/value pair to the <see cref="ConcurrentDictionary{TKey, TValue}"/> by using the specified function 
/// if the key does not already exist. Returns the new value, or the existing value if the key exists.
/// </summary>
public static async Task<TResult> GetOrAddAsync<TKey,TResult>(
    this ConcurrentDictionary<TKey,TResult> dict,
    TKey key, Func<TKey,Task<TResult>> asyncValueFactory)
{
    if (dict.TryGetValue(key, out TResult resultingValue))
    {
        return resultingValue;
    }
    var newValue = await asyncValueFactory(key);
    return dict.GetOrAdd(key, newValue);
}

Instead of dict.GetOrAdd(key,key=>something(key)), you use await dict.GetOrAddAsync(key,async key=>await something(key)). Obviously, in this situation you just write it as await dict.GetOrAddAsync(key,something), but I wanted to make it clear.

In regards to concerns about preserving the order of operations, I have the following observations:

  1. Using the normal GetOrAdd will get the same effect if you look at the way it is implemented. I literally used the same code and made it work for async. Reference says

the valueFactory delegate is called outside the locks to avoid the problems that can arise from executing unknown code under a lock. Therefore, GetOrAdd is not atomic with regards to all other operations on the ConcurrentDictionary<TKey,TValue> class

  1. SyncRoot is not supported in ConcurrentDictionary, they use an internal locking mechanism, so locking on it is not possible. Using your own lock mechanism works only for this extension method, though. If you use another flow (using GetOrAdd for example) you will face the same problem.
Siderite Zackwehdex
  • 5,905
  • 2
  • 27
  • 44
  • This implementation of `GetOrAddAsync` does not preserve the order of operations. Scenario: the Workflow-1 invokes `.GetOrAddAsync("Key", GetAsync("A"))`, then the Workflow-2 invokes `.GetOrAddAsync("Key", GetAsync("B"))`, then the Workflow-3 invokes `.TryRemove("Key", out _)`. Finally the dictionary could end up having either the value "A" or "B", or no value at all. This happens because this `GetOrAddAsync` implementation postpones storing anything into the dictionary until the asynchronous delegate completes. – Theodor Zoulias Jan 13 '21 at 04:47
  • You mean if you use it without await? – Siderite Zackwehdex Jan 13 '21 at 06:22
  • Siderite no, I mean if you await the method properly. My scenario involves three independent asynchronous workflows, where each workflow calls the API and `await`s the returned task. In this scenario the task awaited by the Workflow-1 could take longer to complete than the task awaited by the Workflow-2, in which case the Workflow-1 would overwrite the value entered in the dictionary by the Workflow-2. This behavior would be surprising to say the least. – Theodor Zoulias Jan 13 '21 at 06:44
  • I've updated my answer. I appreciate the level of attention in your implementation, but I believe it is overly complicated for the reasons I listed in the answer. The SO question was about using GetOrAdd with an async delegate, which implies accepting the limitations of the original method. – Siderite Zackwehdex Jan 13 '21 at 07:24
  • Siderite I see your point. Your implementation has indeed a similar behavior to the native `GetOrAdd` ([source code](https://referencesource.microsoft.com/mscorlib/system/Collections/Concurrent/ConcurrentDictionary.cs.html#d8b8308343be2763)). I guess that the typical asynchronous work has a longer duration than the typical synchronous work, and this could make the drawbacks of the native behavior more prominent. In any case my downvote was unwarranted and I revoked it. – Theodor Zoulias Jan 13 '21 at 07:40
3

Probably using a dedicated memory cache (like the new or the old MemoryCache classes, or this third-party library) should be preferable to using a simple ConcurrentDictionary. Unless you don't really need commonly used functionality like time-based expiration, size-based compacting, automatic eviction of entries that are dependent on other entries that have expired, or dependent on mutable external resources (like files, databases etc). It should be noted though that the MemoryCache may still need some work in order to handle asynchronous delegates properly, since its out-of-the-box behavior is not ideal.

Below is a custom extension method GetOrAddAsync for ConcurrentDictionarys that have Task<TValue> values. It accepts a factory method, and ensures that the method will be invoked at most once. It also ensures that failed tasks are removed from the dictionary.

/// <summary>
/// Returns an existing task from the concurrent dictionary, or adds a new task
/// using the specified asynchronous factory method. Concurrent invocations for
/// the same key are prevented, unless the task is removed before the completion
/// of the delegate. Failed tasks are evicted from the concurrent dictionary.
/// </summary>
public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, Task<TValue>> valueFactory)
{
    if (!source.TryGetValue(key, out var currentTask))
    {
        Task<TValue> newTask = null;
        var newTaskTask = new Task<Task<TValue>>(async () =>
        {
            try { return await valueFactory(key).ConfigureAwait(false); }
            catch
            {
                source.TryRemove(KeyValuePair.Create(key, newTask));
                throw;
            }
        });
        newTask = newTaskTask.Unwrap();
        currentTask = source.GetOrAdd(key, newTask);
        if (currentTask == newTask) newTaskTask.Start(TaskScheduler.Default);
    }
    return currentTask;
}

Usage example:

var cache = new ConcurrentDictionary<string, Task<HttpResponseMessage>>();

var response = await cache.GetOrAddAsync("https://stackoverflow.com", async url =>
{
    return await _httpClient.GetAsync(url);
});

Overload with synchronous valueFactory delegate:

public static Task<TValue> GetOrAddAsync<TKey, TValue>(
    this ConcurrentDictionary<TKey, Task<TValue>> source, TKey key,
    Func<TKey, TValue> valueFactory)
{
    if (!source.TryGetValue(key, out var currentTask))
    {
        Task<TValue> newTask = null;
        newTask = new Task<TValue>(() =>
        {
            try { return valueFactory(key); }
            catch
            {
                source.TryRemove(KeyValuePair.Create(key, newTask));
                throw;
            }
        });
        currentTask = source.GetOrAdd(key, newTask);
        if (currentTask == newTask) newTask.Start(TaskScheduler.Default);
    }
    return currentTask;
}

Both overloads invoke the valueFactory delegate on the ThreadPool, ensuring that the current thread will not be blocked. If you have some reason to prefer invoking the delegate on the current thread, you can just replace the Start with the RunSynchronously.

For a version of the GetOrAddAsync method that compiles on the .NET Framework and the .NET Core, you can look at the 3rd revision of this answer.

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69
  • 1
    I've some questions abount your solution. Is `Unwrap` the same like `await newTaskTask`? Why not just using `source.TryRemove(key, out _)` instead of casting the `source` to `ICollection<>`? I'm a little confused about the `newTaskTask.RunSynchronously(TaskScheduler.Default)`. This looks a little strange. Do we need this? The caller will do an `await`. Will this ensure that the task is called? – Sebastian Schumann Feb 16 '21 at 07:10
  • The question is: Is [this implementation](https://dotnetfiddle.net/GD4mwq) more or less equivalent to your example? Yes I know that the execution is "deferred" (not really) until the first caller awaits the returned task. – Sebastian Schumann Feb 16 '21 at 08:06
  • @SebastianSchumann sure, and thanks for asking. The `newTaskTask.Unwrap()` is indeed the same with `await newTaskTask`, provided that task is hot (i.e. it has already started). This is not the case here, and a deadlock will ensue if we try to `await` the task. The task is intentionally cold, because we want to start the task only after it has been successfully inserted into the dictionary. Otherwise, in case the race to update the dictionary has been lost, the cold task will just be discarded. – Theodor Zoulias Feb 16 '21 at 08:14
  • The reason that the `source.TryRemove(key, out _)` is not sufficient is because the `GetOrAddAsync` is just an extension method, and it doesn't control entirely the contents of the dictionary. So it is possible that while the task is running, some other code may replace the task with another task. In case our task fails, we want to remove it from the dictionary only if it's still there, and not remove some other task that is not known to us. – Theodor Zoulias Feb 16 '21 at 08:15
  • The `newTaskTask.RunSynchronously(TaskScheduler.Default)` starts the outer task, which invokes the `valueFactory` delegate. Before reaching this point, the `valueFactory` has not been invoked. It is essential that the `valueFactory` is invoked only once, in case that multiple threads are racing to insert this key into the dictionary. The `TaskScheduler.Default` argument ensures that the `valueFactory` will be invoked synchronously by a well known `TaskScheduler`, and that we are not in the mercy of the `TaskScheduler.Current` (the default value of the parameter), whatever it may be. – Theodor Zoulias Feb 16 '21 at 08:16
  • So, no, [that](https://dotnetfiddle.net/GD4mwq) implementation is not equivalent with the implementation of this answer! – Theodor Zoulias Feb 16 '21 at 08:16
  • Oh yes, sorry - I was totally blind. I didn't see the _cold_ task. This is indeed correct and needed. And yes I forgot the possibility to change a value using `dict[key] = value` that changes the value. Sorry for that. – Sebastian Schumann Feb 16 '21 at 08:28
  • @SebastianSchumann no worries. I am happy that someone asked, so that I can explain the nuances of this peculiar-looking method. :-) – Theodor Zoulias Feb 16 '21 at 08:29
  • Yes. I deleted my question after I read the docs about that function. – Sebastian Schumann Feb 16 '21 at 09:05
  • But there is still one confusion left: Why do we need that call to `RunSynchronously`? There is a race condition: Let's assume that one call creates that task, adds it to the dict and a context switch occurs just before the call to `Run..`. An other task get the already added task and awaits it. This should invoke the `valueFactory`. And should even this factory be also invoked by `Run...`. What am I missing here? – Sebastian Schumann Feb 16 '21 at 09:08
  • @SebastianSchumann awaiting the task is not sufficient to invoke the `valueFactory`. Only the current thread knows about the nested `Task>`, and has the means to start it. If it omits to start it, both tasks, the nested and the unwrapped, will remain cold forever. Any workflow that attempts to await the task, will just deadlock. – Theodor Zoulias Feb 16 '21 at 09:14
  • Okay - I think I got it. Just to be sure: The `newTaskTask` is only known by the current thread - that's obvious. Only this thread is able to call it - clear. A concurrent call gets the `newTask` - also obious. An await to this `newTask` will block because the `newTaskTask` has not been started. Puh - okay. If this assumption holds the implementation is correct. – Sebastian Schumann Feb 16 '21 at 09:20
  • @SebastianSchumann yeap, there is a lot going on in this little method. :-) – Theodor Zoulias Feb 16 '21 at 09:22
  • @SebastianSchumann btw the same idea has been used [here](https://stackoverflow.com/questions/28340177/enforce-an-async-method-to-be-called-once/65714904#65714904) in order to implement an improved `AsyncLazy` class. Creating cold tasks is rarely a good idea, but when it is, it works wonders! – Theodor Zoulias Feb 16 '21 at 11:18
-1

I solved this years ago before ConcurrentDictionary and the TPL was born. I'm in a café and don't have that original code but it went something like this.

It's not a rigorous answer but may inspire your own solution. The important thing is to return the value that was just added or exists already along with the boolean so you can fork execution.

The design lets you easily fork the race winning logic vs. the losing logic.

public bool TryAddValue(TKey key, TValue value, out TValue contains)
{
    // guards etc.

    while (true)
    {
        if (this.concurrentDic.TryAdd(key, value))
        {
            contains = value;
            return true;
        }
        else if (this.concurrentDic.TryGetValue(key, out var existing))
        {
            contains = existing;
            return false;
        }
        else
        {
            // Slipped down the rare path. The value was removed between the
            // above checks. I think just keep trying because we must have
            // been really unlucky.

            // Note this spinning will cause adds to execute out of
            // order since a very unlucky add on a fast moving collection
            // could in theory be bumped again and again before getting
            // lucky and getting its value added, or locating existing.

            // A tiny random sleep might work. Experiment under load.
        }
    }
}

This could be made into an extension for ConcurrentDictionary or be a method on its own your own cache or something using locks.

Perhaps a GetOrAdd(K,V) could be used with an Object.ReferenceEquals() to check if it was added or not, instead of the spin design.

To be honest, the above code isn't the point of my answer. The power comes in the simple design of the method signature and how it affords the following:

static readonly ConcurrentDictionary<string, Task<Task<Thing>>> tasks = new();

//

var newTask = new Task<Task<Thing>>(() => GetThingAsync(thingId));

if (this.tasks.TryAddValue(thingId, newTask, out var task))
{
    task.Start();
}

var thingTask = await task;
var thing = await thingTask;

It's a little quirky how a Task needs to hold a Task (if your work is async), and there's the allocations of unused Tasks to consider.

I think it's a shame Microsoft didn't ship its thread-safe collection with this method, or extract a "concurrent collection" interface.

My real implementation was a cache with sophisticated expiring inner collections and stuff. I guess you could subclass the .NET Task class and add a CreatedAt property to aid with eviction.

Disclaimer I've not tried this at all, it's off top of head, but I used this sort of design in an ultra-hi thru-put app in 2009.

Luke Puplett
  • 39,230
  • 39
  • 170
  • 249