异步线程安全从 MemoryCache 获取

Async threadsafe Get from MemoryCache

我创建了一个使用 .NET MemoryCache 的异步缓存。 这是代码:

public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if(parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    if(!_cache.Contains(key))
    {
        var data = await populator();
        lock(_cache)
        {
            if(!_cache.Contains(key)) //Check again but locked this time
                _cache.Add(key, data, DateTimeOffset.Now.Add(expire));
        }
    }

    return (T)_cache.Get(key);
}

我认为唯一的缺点是我需要在锁外进行等待,因此填充器不是线程安全的,但由于等待不能驻留在锁内,我想这是最好的方法。有没有我错过的陷阱?

更新:当另一个线程使缓存无效时,Esers 答案的一个版本也是线程安全的:

public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if(parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var lazy = new Lazy<Task<T>>(populator, true);
    _cache.AddOrGetExisting(key, lazy, DateTimeOffset.Now.Add(expire));
    return ((Lazy<Task<T>>) _cache.Get(key)).Value;
}

但是它可能会更慢,因为它创建了永远不会执行的 Lazy 实例,并且它在全线程安全模式下使用 Lazy LazyThreadSafetyMode.ExecutionAndPublication

更新新基准(越高越好)

Lazy with lock      42535929
Lazy with GetOrAdd  41070320 (Only solution that is completely thread safe)
Semaphore           64573360

一个简单的解决方案是使用 SemaphoreSlim.WaitAsync() 而不是锁,然后您就可以解决在锁内等待的问题。虽然,MemoryCache 的所有其他方法都是线程安全的。

private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public async Task<T> GetAsync(
            string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    if (!_cache.Contains(key))
    {
        await semaphoreSlim.WaitAsync();
        try
        {
            if (!_cache.Contains(key))
            {
                var data = await populator();
                _cache.Add(key, data, DateTimeOffset.Now.Add(expire));
            }
        }
        finally
        {
            semaphoreSlim.Release();
        }
    }

    return (T)_cache.Get(key);
}

虽然已经有一个 接受的 答案,但我将 post 一个采用 Lazy<T> 方法的新答案。想法是:尽量减少lock的持续时间,如果缓存中不存在该键,则将Lazy<T>放入缓存。这样所有同时使用同一个键的线程都将等待同一个Lazy<T>的值

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    lock (_cache)
    {
        if (!_cache.Contains(key))
        {
            var lazy = new Lazy<Task<T>>(populator, true);
            _cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
        }
    }

    return ((Lazy<Task<T>>)_cache.Get(key)).Value;
}

版本 2

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var lazy = ((Lazy<Task<T>>)_cache.Get(key));
    if (lazy != null) return lazy.Value;

    lock (_cache)
    {
        if (!_cache.Contains(key))
        {
            lazy = new Lazy<Task<T>>(populator, true);
            _cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
            return lazy.Value;
        }
        return ((Lazy<Task<T>>)_cache.Get(key)).Value;
    }
}

版本 3

public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
    if (parameters != null)
        key += JsonConvert.SerializeObject(parameters);

    var task = (Task<T>)_cache.Get(key);
    if (task != null) return task;

    var value = populator();
    return 
     (Task<T>)_cache.AddOrGetExisting(key, value, DateTimeOffset.Now.Add(expire)) ?? value;
}

当前答案使用有些过时的System.Runtime.Caching.MemoryCache。它们还包含微妙的竞争条件(见评论)。最后,并非所有这些都允许超时取决于要缓存的值。

这是我使用新 Microsoft.Extensions.Caching.Memory 的尝试(由 ASP.NET 核心使用):

//Add NuGet package: Microsoft.Extensions.Caching.Memory    

using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives;

MemoryCache _cache = new MemoryCache(new MemoryCacheOptions());

public Task<T> GetOrAddAsync<T>(
        string key, Func<Task<T>> factory, Func<T, TimeSpan> expirationCalculator)
{    
    return _cache.GetOrCreateAsync(key, async cacheEntry => 
    {
        var cts = new CancellationTokenSource();
        cacheEntry.AddExpirationToken(new CancellationChangeToken(cts.Token));
        var value = await factory().ConfigureAwait(false);
        cts.CancelAfter(expirationCalculator(value));
        return value;
    });
}

示例用法:

await GetOrAddAsync("foo", () => Task.Run(() => 42), i  => TimeSpan.FromMilliseconds(i)));

请注意,不能保证工厂方法只被调用一次(参见https://github.com/aspnet/Caching/issues/240)。

这是对 Eser 的尝试改进 (Version2). The Lazy class 默认情况下是线程安全的,因此可以删除 lock。有可能会为给定的键创建多个 Lazy 对象,但只有一个对象会被 Value 属性 查询,从而导致繁重的 Task 启动。其他 Lazy 将保持未使用状态,并且将超出范围并很快成为垃圾收集器。

第一个重载是灵活的通用重载,它接受一个 Func<CacheItemPolicy> 参数。我为最常见的绝对和滑动过期情况添加了两个重载。为了方便起见,可以添加更多重载。

using System.Runtime.Caching;

static partial class MemoryCacheExtensions
{
    public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
        Func<Task<T>> valueFactory, Func<CacheItemPolicy> cacheItemPolicyFactory = null)
    {
        var lazyTask = (Lazy<Task<T>>)cache.Get(key);
        if (lazyTask == null)
        {
            var newLazyTask = new Lazy<Task<T>>(valueFactory);
            var cacheItem = new CacheItem(key, newLazyTask);
            var cacheItemPolicy = cacheItemPolicyFactory?.Invoke();
            var existingCacheItem = cache.AddOrGetExisting(cacheItem, cacheItemPolicy);
            lazyTask = (Lazy<Task<T>>)existingCacheItem?.Value ?? newLazyTask;
        }
        return ToAsyncConditional(lazyTask.Value);
    }

    private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
    {
        if (task.IsCompleted) return task;
        return task.ContinueWith(t => t,
            default, TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default).Unwrap();
    }

    public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
        Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
    {
        return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
        {
            AbsoluteExpiration = absoluteExpiration,
        });
    }

    public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
        Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
    {
        return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
        {
            SlidingExpiration = slidingExpiration,
        });
    }
}

用法示例:

string html = await MemoryCache.Default.GetOrCreateLazyAsync("MyKey", async () =>
{
    return await new WebClient().DownloadStringTaskAsync("https://whosebug.com");
}, DateTimeOffset.Now.AddMinutes(10));

本站HTML下载并缓存10分钟。多个并发请求将await同一个任务完成。

关于缓存优先级的 System.Runtime.Caching.MemoryCache class is easy to use, but has limited support for prioritizing the cache entries. Basically there are only two options, Default and NotRemovable, meaning it's hardly adequate for advanced scenarios. The newer Microsoft.Extensions.Caching.Memory.MemoryCache class (from this package) offers more optionsLowNormalHighNeverRemove),但在其他方面不太直观且使用起来更麻烦.它提供异步功能,但不是惰性的。所以这里是这个 class:

的 LazyAsync 等效扩展
using Microsoft.Extensions.Caching.Memory;

static partial class MemoryCacheExtensions
{
    public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
        Func<Task<T>> valueFactory, MemoryCacheEntryOptions options = null)
    {
        if (!cache.TryGetValue(key, out Lazy<Task<T>> lazy))
        {
            var entry = cache.CreateEntry(key);
            if (options != null) entry.SetOptions(options);
            var newLazy = new Lazy<Task<T>>(valueFactory);
            entry.Value = newLazy;
            entry.Dispose(); // Dispose actually inserts the entry in the cache
            if (!cache.TryGetValue(key, out lazy)) lazy = newLazy;
        }
        return ToAsyncConditional(lazy.Value);
    }

    private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
    {
        if (task.IsCompleted) return task;
        return task.ContinueWith(t => t,
            default, TaskContinuationOptions.RunContinuationsAsynchronously,
            TaskScheduler.Default).Unwrap();
    }

    public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
        Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
    {
        return cache.GetOrCreateLazyAsync(key, valueFactory,
            new MemoryCacheEntryOptions() { AbsoluteExpiration = absoluteExpiration });
    }

    public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
        Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
    {
        return cache.GetOrCreateLazyAsync(key, valueFactory,
            new MemoryCacheEntryOptions() { SlidingExpiration = slidingExpiration });
    }
}

用法示例:

var cache = new MemoryCache(new MemoryCacheOptions());
string html = await cache.GetOrCreateLazyAsync("MyKey", async () =>
{
    return await new WebClient().DownloadStringTaskAsync("https://whosebug.com");
}, DateTimeOffset.Now.AddMinutes(10));

更新:我刚刚了解到 async-await 机制的 。当一个不完整的 Task 被并发等待多次时,continuation 将 运行 一个接一个地同步(在同一个线程中)(假设没有同步上下文)。对于 GetOrCreateLazyAsync 的上述实现来说,这可能是一个问题,因为阻塞代码可能会在对 GetOrCreateLazyAsync 的等待调用之后立即存在,在这种情况下,其他等待者将受到影响(延迟,甚至陷入僵局)。此问题的可能解决方案是 return 延迟创建的 Task 的异步延续,而不是任务本身,但前提是任务未完成。这就是上面引入ToAsyncConditional方法的原因。


注意:此实现缓存异步 lambda 调用期间可能发生的任何错误。一般来说,这可能不是理想的行为。 我可能的解决方案是用 AsyncLazy<T> type from Stephen Cleary's Nito.AsyncEx.Coordination 包替换 Lazy<Task<T>>,用 RetryOnFailure 选项实例化。

这个帖子有点旧,但我最近才看到这个,我想我会留下这个答案希望它能有所帮助。

对于异步,有几点需要牢记:

  1. “超级锁”方法并不快速,因为它会在对一个键执行操作时阻止对 其他 键的工厂操作。
  2. “按键锁定”(SemaphoreSlim) 有两件事:a。它是一次性的,因此可以在赛后处理。 b.忍受不处理它。

我选择使用锁池来解决。不需要每个键都有一个锁,但只要有足够的锁就可以达到最大活动线程数。然后我通过散列将相同的锁分配给密钥。池大小是 ProcessorCount 的函数。 valueFactory 只执行一次。由于多个键映射到一个锁(一个键总是映射到同一个锁),具有相同散列的键的操作将得到同步。所以这失去了一些并行性,并且这种折衷可能不适用于所有情况。我同意这种妥协。这是 LazyCacheFusionCache(它的许多方法之一)使用的方法,等等。所以我会使用其中的一个,但知道这个技巧是件好事,因为它非常漂亮。

private readonly SemaphoreSlimPool _lockPool = new SemaphoreSlimPool(1, 1);

private async Task<TValue> GetAsync(object key, Func<ICacheEntry, Task<TValue>> valueFactory)
{
    if (_cache.TryGetValue(key, out var value))
    {
        return value;
    }

    // key-specific lock so as to not block operations on other keys
    var lockForKey = _lockPool[key];
    await lockForKey.WaitAsync().ConfigureAwait(false);
    try
    {
        if (_cache.TryGetValue(key, out value))
        {
            return value;
        }

        value = await _cache.GetOrCreateAsync(key, valueFactory).ConfigureAwait(false);
        return value;
    }
    finally
    {
        lockForKey.Release();
    }
}

// Dispose SemaphoreSlimPool

这是 SemaphoreSlimPool 实现 (source, nuget)。

/// <summary>
/// Provides a pool of SemaphoreSlim objects for keyed usage.
/// </summary>
public class SemaphoreSlimPool : IDisposable
{
    /// <summary>
    /// Pool of SemaphoreSlim objects.
    /// </summary>
    private readonly SemaphoreSlim[] pool;

    /// <summary>
    /// Size of the pool.
    /// <para></para>
    /// Environment.ProcessorCount is not always correct so use more slots as buffer,
    /// with a minimum of 32 slots.
    /// </summary>
    private readonly int poolSize = Math.Max(Environment.ProcessorCount << 3, 32);

    private const int NoMaximum = int.MaxValue;

    /// <summary>
    /// Ctor.
    /// </summary>
    public SemaphoreSlimPool(int initialCount)
        : this(initialCount, NoMaximum)
    { }

    /// <summary>
    /// Ctor.
    /// </summary>
    public SemaphoreSlimPool(int initialCount, int maxCount)
    {
        pool = new SemaphoreSlim[poolSize];
        for (int i = 0; i < poolSize; i++)
        {
            pool[i] = new SemaphoreSlim(initialCount, maxCount);
        }
    }

    /// <inheritdoc cref="Get(object)" />
    public SemaphoreSlim this[object key] => Get(key);

    /// <summary>
    /// Returns a <see cref="SemaphoreSlim"/> from the pool that the <paramref name="key"/> maps to.
    /// </summary>
    /// <exception cref="ArgumentNullException"></exception>
    public SemaphoreSlim Get(object key)
    {
        _ = key ?? throw new ArgumentNullException(nameof(key));
        return pool[GetIndex(key)];
    }

    private uint GetIndex(object key)
    {
        return unchecked((uint)key.GetHashCode()) % (uint)poolSize;
    }

    private bool disposed = false;

    public void Dispose()
    {
        Dispose(true);
    }

    public void Dispose(bool disposing)
    {
        if (!disposed)
        {
            if (disposing)
            {
                if (pool != null)
                {
                    for (int i = 0; i < poolSize; i++)
                    {
                        pool[i].Dispose();
                    }
                }
            }

            disposed = true;
        }
    }
}

由于 ttl 低,我已经在这上面投入了很多线程,并且有很多流失,而且它没有被炸毁。到目前为止,我觉得还不错,但我想看看是否有人能找到错误。