.Net 中的钥匙锁

Keyed Lock in .Net

我有一个 Azure 服务总线队列,我在其中接收 运行ge 从 1 到 10 条具有相同 "key" 的消息。其中一条消息需要使用长 运行 操作进行处理。完成后,数据库将被更新,其他消息将对其进行检查。但是,与此同时,其他消息将重新排队,以免进程丢失。

但要点是这个长运行操作不能同时对同一个键运行,不应该运行 多次。

这是我目前得到的:

void Main()
{
    Enumerable.Range(1, 1000)
              .AsParallel()
              .ForAll(async i => await ManageConcurrency(i % 2, async () => await Task.Delay(TimeSpan.FromSeconds(10)))); 
}

private readonly ConcurrentDictionary<int, SemaphoreSlim> _taskLocks = new ConcurrentDictionary<int, SemaphoreSlim>();

private async Task<bool> ManageConcurrency(int taskId, Func<Task> task)
{
    SemaphoreSlim taskLock = null;

    try
    {
        if (_taskLocks.TryGetValue(taskId, out taskLock))
        {
            if (taskLock.CurrentCount == 0)
            {
                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I found. No available.. Thread Id: {Thread.CurrentThread.ManagedThreadId}");
                return false;
            }

            taskLock.Wait();

            Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I found and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
        }
        else
        {
            taskLock = new SemaphoreSlim(1, 1);
            taskLock = _taskLocks.GetOrAdd(taskId, taskLock);
            if (taskLock.CurrentCount == 0)
            {
                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I didn't find, and then found/created. None available.. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
                return false;
            }
            else
            {
                taskLock.Wait(TimeSpan.FromSeconds(1));

                Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, I didn't find, then found/created, and took. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
            }
        }

        Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, Lock pulled for TaskId {taskId}, Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");

        await task.Invoke();

        return true;
    }
    catch (Exception e)
    {
        ;
        return false;
    }
    finally
    {
        //taskLock?.Release();

        _taskLocks.TryRemove(taskId, out taskLock);

        //Console.WriteLine($"I removed. Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");
    }
}

它没有按预期工作,因为它会创建多个信号量,突然我的长 运行 操作使用相同的密钥 运行 两次。我认为问题是因为整个操作不是原子的。

解决此问题的最佳方法是什么?

快完成了...您需要保留收到的订单吗?如果不是:

public static void Main(string[] args)
{
    Enumerable.Range(1, 1000)
                .AsParallel()
                .ForAll( i => ManageConcurrency(i % 2,  () => Task.Delay(TimeSpan.FromSeconds(10))).Wait());


}

private static readonly ConcurrentDictionary<int, SemaphoreSlim> _lockDict = new ConcurrentDictionary<int, SemaphoreSlim>();

private static async Task<bool> ManageConcurrency(int taskId, Func<Task> task)
{

    var gate = _lockDict.GetOrAdd(taskId, _ => new SemaphoreSlim(1, 1));
    await gate.WaitAsync();

    try
    {

        Console.WriteLine($"{DateTime.Now.ToString("hh:mm:ss.ffffff")},  {taskId}, Lock pulled for TaskId {taskId}, Thread Id: {System.Threading.Thread.CurrentThread.ManagedThreadId}");

        await task();

        return true;
    }
    catch (Exception e)
    {
        return false;
    }
    finally
    {
        gate.Release();
    }

}

您正确地认识到您需要确保每个键只创建一个信号量。标准的成语是:

var dict = new ConcurrentDictionary<TKey, Lazy<SemaphoreSlim>>();
...
var sem = dict.GetOrAdd( , _ => new new Lazy<SemaphoreSlim>(() => SemaphoreSlim(1, 1))).Value;

可能会创建多个懒惰,但只有其中一个会被揭示和具体化。

此外,依赖 in-memory 状态是一种有问题的做法。如果您的队列处理应用程序回收并且所有信号量都丢失了怎么办?您最好使用持久存储来跟踪此锁定信息。

在我看来,您担心信号量等问题会让您的生活更加艰难。有更容易使用的抽象。

在这种情况下使用 Lazy<T> 是理想的,但由于您想等待结果,因此 Lazy<T> 需要升级到 AsyncLazy<T>

public class AsyncLazy<T> : Lazy<Task<T>>
{
    public AsyncLazy(Func<T> valueFactory) :
        base(() => Task.Factory.StartNew(valueFactory))
    { }

    public AsyncLazy(Func<T> valueFactory, LazyThreadSafetyMode mode) :
        base(() => Task.Factory.StartNew(valueFactory), mode)
    { }

    public AsyncLazy(Func<Task<T>> taskFactory) :
        base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap())
    { }

    public AsyncLazy(Func<Task<T>> taskFactory, LazyThreadSafetyMode mode) :
        base(() => Task.Factory.StartNew(() => taskFactory()).Unwrap(), mode)
    { }

    public TaskAwaiter<T> GetAwaiter() { return Value.GetAwaiter(); }
}

我创建了一个 class 来模拟长 运行 任务的结果:

public class LongRunningResult
{
    public int Index;
}

需要获取运行的方法进行计算:

private LongRunningResult ComputeLongRunningResult(int index)
{
    Console.WriteLine($"Running Index {index}");
    Thread.Sleep(1000);
    return new LongRunningResult() { Index = index };
}

现在我们需要字典来保存惰性异步:

private readonly ConcurrentDictionary<int, AsyncLazy<LongRunningResult>> _results
    = new ConcurrentDictionary<int, AsyncLazy<LongRunningResult>>();

现在变得超级简单:

Enumerable
    .Range(1, 10)
    .AsParallel()
    .ForAll(async i =>
    {
        var index = i % 2;
        Console.WriteLine($"Trying Index {index}");
        _results.TryAdd(index,
            new AsyncLazy<LongRunningResult>(
                () => ComputeLongRunningResult(index),
                LazyThreadSafetyMode.ExecutionAndPublication));
        AsyncLazy<LongRunningResult> ayncLazy;
        if (_results.TryGetValue(index, out ayncLazy))
        {
            await ayncLazy;
        }
    });

我从中得到的输出是这样的:

Trying Index 1
Trying Index 0
Trying Index 1
Trying Index 1
Trying Index 0
Trying Index 1
Running Index 1
Trying Index 0
Trying Index 1
Running Index 0
Trying Index 0
Trying Index 0