1

I have the following situation:

I have a lot of threads in my project, and each thread process one "key" by time.

Two threads cannot process the same "key" at the same time, but my project process A LOOOOOT OF KEYS, so I can't store the "keys" on memory, I need to store on memory that a thread is processing a "key" and if another thread tries to process the same "key" this thread will be waiting in a lock clause.

Now I have the following structure:

   public class Lock
    {
        private static object _lockObj = new object();
        private static List<object> _lockListValues = new List<object>();

        public static void Execute(object value, Action action)
        {
            lock (_lockObj)
            {
                if (!_lockListValues.Contains(value))
                    _lockListValues.Add(value);
            }

            lock (_lockListValues.First(x => x.Equals(value)))
            {
                action.Invoke();
            }
        }
    }

It is working fine, the problem is that the keys aren't being removed from the memory. the biggest trouble is the multi thread feature because at any time a "key" can be processed.

How could I solve this without a global lock independent of the keys?

Only a Curious Mind
  • 2,739
  • 21
  • 39
  • 3
    Could you not use a concurrent queue? Make a single thread that passes the keys onto the queue and make it so that the other threads pull one key from the queue and process it. By using the concurrent queue you can ensure that only one thread has the key at a time. – MiltoxBeyond Nov 18 '15 at 17:36
  • 2
    It is not safe to obtain a global lock to do Contains and Add but then iterate the list outside that lock. – Mike Zboray Nov 18 '15 at 17:37
  • @MiltoxBeyond If I do that, I will lose a lot of processing of my software, because all the process will taper on the same queue. I can create one queue for each key at execution time, but it will change all my structure (that is very big). – Only a Curious Mind Nov 18 '15 at 17:41
  • @mikez I can't see what is not safe with my current structure, could you tell me? – Only a Curious Mind Nov 18 '15 at 17:44
  • @OnlyaCuriousMind You are calling First outside any locks. The call to First must occur within the global lock. – Mike Zboray Nov 18 '15 at 17:46
  • @mikez Really, so you have some idea to help me about how do what I need? – Only a Curious Mind Nov 18 '15 at 17:47
  • You should be accessing `_lockListValues` within your `lock(_lockObj)`, and you aren't removing the keys from the list after your invoke. You probably should be passing the value into the action, and there is also likely a better way of handling your multithreading as well, but you haven't given enough information to help you out there. – Robert McKee Nov 18 '15 at 18:17
  • @RobertMcKee If I access `_lockListValues` within `lock(_lockObj)` don't make sense use the dynamic lock, because every threads will be locked in the same object `_lockObj` right? – Only a Curious Mind Nov 18 '15 at 18:20
  • 1
    `object key; lock (_lockObj) { if (!_lockListValues.Contains(value)) _lockListValues.Add(value); key=lockListValue.First(x=>x.Equals(value));}lock(key){...}` feel free to optimize it in the case it is found. – Robert McKee Nov 18 '15 at 18:22
  • 1
    That fixes that problem, but without seeing your key removal code, you will likely remove the key before you should if another thread gets blocked. – Robert McKee Nov 18 '15 at 18:27
  • @RobertMcKee Oh, now I understood. There is a problem in my current code. But there isn't any solution to remove the key safely? – Only a Curious Mind Nov 18 '15 at 18:47
  • Related: [Asynchronous locking based on a key](https://stackoverflow.com/questions/31138179/asynchronous-locking-based-on-a-key) – Theodor Zoulias Dec 11 '20 at 17:55

2 Answers2

2

Sorry, but no, this is not the way it should be done.

First, you speak about keys, but you store keys as type object in List and then you are searching with LINQ to get that from list.

For that kind of stuff is here dictionary.

Second, object model, usually it is best to implement locking of some object around some class, make it nice and clean:

like:

using System.Collections.Concurrent;


public LockedObject<T>
{
    public readonly T data;
    public readonly int id;
    private readonly object obj = new object();
    LockedObject(int id, T data)
    {
        this.id = id;
        this.data = data;

    }

    //Usually, if you have Action related to some data,
    //it is better to receive
    //that data as parameter

    public void InvokeAction(Action<T> action)
    {
        lock(obj)
        {
            action(data);
        }
    }

}

//Now it is a concurrently safe object applying some action
//concurrently on given data, no matter how it is stored.
//But still, this is the best idea:


ConcurrentDictionary<int, LockedObject<T>> dict =
new ConcurrentDictionary<int, LockedObject<T>>();

//You can insert, read, remove all object's concurrently.

But, the best thing is yet to come! :) You can make it lock free and very easily!

EDIT1:

ConcurrentInvoke, dictionary like collection for concurrently safe invoking action over data. There can be only one action at the time on given key.

using System;
using System.Threading;
using System.Collections.Concurrent;


public class ConcurrentInvoke<TKey, TValue>
{
    //we hate lock() :)

    private class Data<TData>
    {
        public readonly TData data;
        private int flag;
        private Data(TData data)
        {
            this.data = data;
        }
        public static bool Contains<TTKey>(ConcurrentDictionary<TTKey, Data<TData>> dict, TTKey key)
        {
            return dict.ContainsKey(key);
        }
        public static bool TryAdd<TTKey>(ConcurrentDictionary<TTKey, Data<TData>> dict, TTKey key, TData data)
        {
            return dict.TryAdd(key, new Data<TData>(data));
        }
        // can not remove if,
        // not exist,
        // remove of the key already in progress,
        // invoke action of the key inprogress
        public static bool TryRemove<TTKey>(ConcurrentDictionary<TTKey, Data<TData>> dict, TTKey key, Action<TTKey, TData> action_removed = null)
        {
            Data<TData> data = null;
            if (!dict.TryGetValue(key, out data)) return false;

            var access = Interlocked.CompareExchange(ref data.flag, 1, 0) == 0;
            if (!access) return false;

            Data<TData> data2 = null;
            var removed = dict.TryRemove(key, out data2);

            Interlocked.Exchange(ref data.flag, 0);

            if (removed && action_removed != null) action_removed(key, data2.data);
            return removed;
        }
        // can not invoke if,
        // not exist,
        // remove of the key already in progress,
        // invoke action of the key inprogress
        public static bool TryInvokeAction<TTKey>(ConcurrentDictionary<TTKey, Data<TData>> dict, TTKey key, Action<TTKey, TData> invoke_action = null)
        {
            Data<TData> data = null;
            if (invoke_action == null || !dict.TryGetValue(key, out data)) return false;

            var access = Interlocked.CompareExchange(ref data.flag, 1, 0) == 0;
            if (!access) return false;

            invoke_action(key, data.data);

            Interlocked.Exchange(ref data.flag, 0);
            return true;
        }
    }

    private 
    readonly
    ConcurrentDictionary<TKey, Data<TValue>> dict =
    new ConcurrentDictionary<TKey, Data<TValue>>()
    ;

    public bool Contains(TKey key)
    {
        return Data<TValue>.Contains(dict, key);
    }
    public bool TryAdd(TKey key, TValue value)
    {
        return Data<TValue>.TryAdd(dict, key, value);
    }
    public bool TryRemove(TKey key, Action<TKey, TValue> removed = null)
    {
        return Data<TValue>.TryRemove(dict, key, removed);
    }
    public bool TryInvokeAction(TKey key, Action<TKey, TValue> invoke)
    {
        return Data<TValue>.TryInvokeAction(dict, key, invoke);
    }
}




ConcurrentInvoke<int, string> concurrent_invoke = new ConcurrentInvoke<int, string>();

concurrent_invoke.TryAdd(1, "string 1");
concurrent_invoke.TryAdd(2, "string 2");
concurrent_invoke.TryAdd(3, "string 3");

concurrent_invoke.TryRemove(1);

concurrent_invoke.TryInvokeAction(3, (key, value) =>
{
    Console.WriteLine("InvokingAction[key: {0}, vale: {1}", key, value);
});
ipavlu
  • 1,553
  • 13
  • 20
  • 1
    Your `ConcurrentAccess` class is very clever. I like it! however `dict.Add(t);` will not compile because `ConcurrentDictionary` does not have a non-try `Add` method. You likely want `AddOrUpdate`, but be aware this does give a small window where someone could call `TryInvokeAction`, another method calls `TryAdd` and gets a `true` result, then the invoked action finishes and overwrites the added value and also returns `true`. You could solve this problem with a 2nd dictionary or a thread safe `HashSet` of keys that you check before calling `TryAdd` – Scott Chamberlain Nov 18 '15 at 20:52
  • @ScottChamberlain Yes, you are right, TryAdd and key uniqueness, I made an assumption about already unique keys! Yes, the second dictionary could work. That second class was a nice idea before sleep. I will improve it as edit:). I think I found today possible use in some other scenario, just not as static implementation... Thanks for thinking about it:). – ipavlu Nov 19 '15 at 17:57
0

I modified a KeyedLock class that I posted in another question, to use internally the Monitor class instead of SemaphoreSlims. I expected that using a specialized mechanism for synchronous locking would offer better performance, but I can't actually see any difference. I am posting it anyway because it has the added convenience feature of releasing the lock automatically with the using statement. This feature adds no significant overhead in the case of synchronous locking, so there is no reason to omit it.

Another reason that justifies this separate implementation is that the Monitor has different semantics than the SemaphoreSlim. The Monitor is reentrant while the SemaphoreSlim is not. A single thread is allowed to enter the Monitor multiple times, before finally Exiting an equal number of times. This is not possible with a SemaphoreSlim. If a thread make an attempt to Wait a second time a SemaphoreSlim(1, 1), most likely it will deadlock.

The KeyedMonitor class stores internally only the locking objects that are currently in use, plus a small pool of locking objects that have been released and can be reused. This pool reduces significantly the memory allocations under heavy usage, at the cost of some added synchronization overhead.

public class KeyedMonitor<TKey>
{
    private readonly Dictionary<TKey, (object, int)> _perKey;
    private readonly Stack<object> _pool;
    private readonly int _poolCapacity;

    public KeyedMonitor(IEqualityComparer<TKey> keyComparer = null,
        int poolCapacity = 10)
    {
        _perKey = new Dictionary<TKey, (object, int)>(keyComparer);
        _pool = new Stack<object>(poolCapacity);
        _poolCapacity = poolCapacity;
    }

    public ExitToken Enter(TKey key)
    {
        var locker = GetLocker(key);
        Monitor.Enter(locker);
        return new ExitToken(this, key);
    }

    // Abort-safe API
    public void Enter(TKey key, ref bool lockTaken)
    {
        try { }
        finally // Abort-safe block
        {
            var locker = GetLocker(key);
            try { Monitor.Enter(locker, ref lockTaken); }
            finally { if (!lockTaken) ReleaseLocker(key, withMonitorExit: false); }
        }
    }

    public bool TryEnter(TKey key, int millisecondsTimeout)
    {
        var locker = GetLocker(key);
        bool acquired = false;
        try { acquired = Monitor.TryEnter(locker, millisecondsTimeout); }
        finally { if (!acquired) ReleaseLocker(key, withMonitorExit: false); }
        return acquired;
    }

    public void Exit(TKey key) => ReleaseLocker(key, withMonitorExit: true);

    private object GetLocker(TKey key)
    {
        object locker;
        lock (_perKey)
        {
            if (_perKey.TryGetValue(key, out var entry))
            {
                int counter;
                (locker, counter) = entry;
                counter++;
                _perKey[key] = (locker, counter);
            }
            else
            {
                lock (_pool) locker = _pool.Count > 0 ? _pool.Pop() : null;
                if (locker == null) locker = new object();
                _perKey[key] = (locker, 1);
            }
        }
        return locker;
    }

    private void ReleaseLocker(TKey key, bool withMonitorExit)
    {
        object locker; int counter;
        lock (_perKey)
        {
            if (_perKey.TryGetValue(key, out var entry))
            {
                (locker, counter) = entry;
                // It is important to allow a possible SynchronizationLockException
                // to be surfaced before modifying the internal state of the class.
                // That's why the Monitor.Exit should be called here.
                // Exiting the Monitor while holding the inner lock should be safe.
                if (withMonitorExit) Monitor.Exit(locker);
                counter--;
                if (counter == 0)
                    _perKey.Remove(key);
                else
                    _perKey[key] = (locker, counter);
            }
            else
            {
                throw new InvalidOperationException("Key not found.");
            }
        }
        if (counter == 0)
            lock (_pool) if (_pool.Count < _poolCapacity) _pool.Push(locker);
    }

    public readonly struct ExitToken : IDisposable
    {
        private readonly KeyedMonitor<TKey> _parent;
        private readonly TKey _key;

        public ExitToken(KeyedMonitor<TKey> parent, TKey key)
        {
            _parent = parent; _key = key;
        }

        public void Dispose() => _parent?.Exit(_key);
    }
}

Usage example:

var locker = new KeyedMonitor<string>();

using (locker.Enter("Hello"))
{
    DoSomething(); // with the "Hello" resource
}

Although the KeyedMonitor class is thread-safe, it is not as robust as using the lock statement directly, because it offers no resilience in case of a ThreadAbortException. An aborted thread could leave the class in a corrupted internal state. I don't consider this to be a big issue, since the Thread.Abort method has become obsolete in the current version of the .NET platform (.NET 5).

For an explanation about why the IDisposable ExitToken struct is not boxed by the using statement, you can look here: If my struct implements IDisposable will it be boxed when used in a using statement? If this was not the case, the ExitToken feature would add significant overhead.

Caution: please don't store anywhere the ExitToken value returned by the KeyedMonitor.Enter method. There is no protection against misuse of this struct (like disposing it multiple times). The intended usage of this method is shown in the example above.


Update: I added an Enter overload that allows to take the lock with thread-abort resilience, albeit with an inconvenient syntax:

bool lockTaken = false;
try
{
    locker.Enter("Hello", ref lockTaken);
    DoSomething();
}
finally
{
    if (lockTaken) locker.Exit("Hello");
}

As with the underlying Monitor class, the lockTaken is always true after a successful invocation of the Enter method. The lockTaken can be false only if the Enter throws an exception.

Theodor Zoulias
  • 24,585
  • 5
  • 40
  • 69