异步线程安全从 MemoryCache 获取
Async threadsafe Get from MemoryCache
我创建了一个使用 .NET MemoryCache
的异步缓存。
这是代码:
public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if(parameters != null)
key += JsonConvert.SerializeObject(parameters);
if(!_cache.Contains(key))
{
var data = await populator();
lock(_cache)
{
if(!_cache.Contains(key)) //Check again but locked this time
_cache.Add(key, data, DateTimeOffset.Now.Add(expire));
}
}
return (T)_cache.Get(key);
}
我认为唯一的缺点是我需要在锁外进行等待,因此填充器不是线程安全的,但由于等待不能驻留在锁内,我想这是最好的方法。有没有我错过的陷阱?
更新:当另一个线程使缓存无效时,Esers 答案的一个版本也是线程安全的:
public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if(parameters != null)
key += JsonConvert.SerializeObject(parameters);
var lazy = new Lazy<Task<T>>(populator, true);
_cache.AddOrGetExisting(key, lazy, DateTimeOffset.Now.Add(expire));
return ((Lazy<Task<T>>) _cache.Get(key)).Value;
}
但是它可能会更慢,因为它创建了永远不会执行的 Lazy 实例,并且它在全线程安全模式下使用 Lazy LazyThreadSafetyMode.ExecutionAndPublication
更新新基准(越高越好)
Lazy with lock 42535929
Lazy with GetOrAdd 41070320 (Only solution that is completely thread safe)
Semaphore 64573360
一个简单的解决方案是使用 SemaphoreSlim.WaitAsync()
而不是锁,然后您就可以解决在锁内等待的问题。虽然,MemoryCache
的所有其他方法都是线程安全的。
private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public async Task<T> GetAsync(
string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);
if (!_cache.Contains(key))
{
await semaphoreSlim.WaitAsync();
try
{
if (!_cache.Contains(key))
{
var data = await populator();
_cache.Add(key, data, DateTimeOffset.Now.Add(expire));
}
}
finally
{
semaphoreSlim.Release();
}
}
return (T)_cache.Get(key);
}
虽然已经有一个 接受的 答案,但我将 post 一个采用 Lazy<T>
方法的新答案。想法是:尽量减少lock
块的持续时间,如果缓存中不存在该键,则将Lazy<T>
放入缓存。这样所有同时使用同一个键的线程都将等待同一个Lazy<T>
的值
public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);
lock (_cache)
{
if (!_cache.Contains(key))
{
var lazy = new Lazy<Task<T>>(populator, true);
_cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
}
}
return ((Lazy<Task<T>>)_cache.Get(key)).Value;
}
版本 2
public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);
var lazy = ((Lazy<Task<T>>)_cache.Get(key));
if (lazy != null) return lazy.Value;
lock (_cache)
{
if (!_cache.Contains(key))
{
lazy = new Lazy<Task<T>>(populator, true);
_cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
return lazy.Value;
}
return ((Lazy<Task<T>>)_cache.Get(key)).Value;
}
}
版本 3
public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);
var task = (Task<T>)_cache.Get(key);
if (task != null) return task;
var value = populator();
return
(Task<T>)_cache.AddOrGetExisting(key, value, DateTimeOffset.Now.Add(expire)) ?? value;
}
当前答案使用有些过时的System.Runtime.Caching.MemoryCache
。它们还包含微妙的竞争条件(见评论)。最后,并非所有这些都允许超时取决于要缓存的值。
这是我使用新 Microsoft.Extensions.Caching.Memory 的尝试(由 ASP.NET 核心使用):
//Add NuGet package: Microsoft.Extensions.Caching.Memory
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives;
MemoryCache _cache = new MemoryCache(new MemoryCacheOptions());
public Task<T> GetOrAddAsync<T>(
string key, Func<Task<T>> factory, Func<T, TimeSpan> expirationCalculator)
{
return _cache.GetOrCreateAsync(key, async cacheEntry =>
{
var cts = new CancellationTokenSource();
cacheEntry.AddExpirationToken(new CancellationChangeToken(cts.Token));
var value = await factory().ConfigureAwait(false);
cts.CancelAfter(expirationCalculator(value));
return value;
});
}
示例用法:
await GetOrAddAsync("foo", () => Task.Run(() => 42), i => TimeSpan.FromMilliseconds(i)));
请注意,不能保证工厂方法只被调用一次(参见https://github.com/aspnet/Caching/issues/240)。
这是对 Eser 的尝试改进 (Version2). The Lazy
class 默认情况下是线程安全的,因此可以删除 lock
。有可能会为给定的键创建多个 Lazy
对象,但只有一个对象会被 Value
属性 查询,从而导致繁重的 Task
启动。其他 Lazy
将保持未使用状态,并且将超出范围并很快成为垃圾收集器。
第一个重载是灵活的通用重载,它接受一个 Func<CacheItemPolicy>
参数。我为最常见的绝对和滑动过期情况添加了两个重载。为了方便起见,可以添加更多重载。
using System.Runtime.Caching;
static partial class MemoryCacheExtensions
{
public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
Func<Task<T>> valueFactory, Func<CacheItemPolicy> cacheItemPolicyFactory = null)
{
var lazyTask = (Lazy<Task<T>>)cache.Get(key);
if (lazyTask == null)
{
var newLazyTask = new Lazy<Task<T>>(valueFactory);
var cacheItem = new CacheItem(key, newLazyTask);
var cacheItemPolicy = cacheItemPolicyFactory?.Invoke();
var existingCacheItem = cache.AddOrGetExisting(cacheItem, cacheItemPolicy);
lazyTask = (Lazy<Task<T>>)existingCacheItem?.Value ?? newLazyTask;
}
return ToAsyncConditional(lazyTask.Value);
}
private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
{
if (task.IsCompleted) return task;
return task.ContinueWith(t => t,
default, TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default).Unwrap();
}
public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
{
return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
{
AbsoluteExpiration = absoluteExpiration,
});
}
public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
{
return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
{
SlidingExpiration = slidingExpiration,
});
}
}
用法示例:
string html = await MemoryCache.Default.GetOrCreateLazyAsync("MyKey", async () =>
{
return await new WebClient().DownloadStringTaskAsync("https://whosebug.com");
}, DateTimeOffset.Now.AddMinutes(10));
本站HTML下载并缓存10分钟。多个并发请求将await
同一个任务完成。
关于缓存优先级的 System.Runtime.Caching.MemoryCache
class is easy to use, but has limited support for prioritizing the cache entries. Basically there are only two options, Default
and NotRemovable
, meaning it's hardly adequate for advanced scenarios. The newer Microsoft.Extensions.Caching.Memory.MemoryCache
class (from this package) offers more options(Low
、Normal
、High
和 NeverRemove
),但在其他方面不太直观且使用起来更麻烦.它提供异步功能,但不是惰性的。所以这里是这个 class:
的 LazyAsync 等效扩展
using Microsoft.Extensions.Caching.Memory;
static partial class MemoryCacheExtensions
{
public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
Func<Task<T>> valueFactory, MemoryCacheEntryOptions options = null)
{
if (!cache.TryGetValue(key, out Lazy<Task<T>> lazy))
{
var entry = cache.CreateEntry(key);
if (options != null) entry.SetOptions(options);
var newLazy = new Lazy<Task<T>>(valueFactory);
entry.Value = newLazy;
entry.Dispose(); // Dispose actually inserts the entry in the cache
if (!cache.TryGetValue(key, out lazy)) lazy = newLazy;
}
return ToAsyncConditional(lazy.Value);
}
private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
{
if (task.IsCompleted) return task;
return task.ContinueWith(t => t,
default, TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default).Unwrap();
}
public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
{
return cache.GetOrCreateLazyAsync(key, valueFactory,
new MemoryCacheEntryOptions() { AbsoluteExpiration = absoluteExpiration });
}
public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
{
return cache.GetOrCreateLazyAsync(key, valueFactory,
new MemoryCacheEntryOptions() { SlidingExpiration = slidingExpiration });
}
}
用法示例:
var cache = new MemoryCache(new MemoryCacheOptions());
string html = await cache.GetOrCreateLazyAsync("MyKey", async () =>
{
return await new WebClient().DownloadStringTaskAsync("https://whosebug.com");
}, DateTimeOffset.Now.AddMinutes(10));
更新:我刚刚了解到 async
-await
机制的 。当一个不完整的 Task
被并发等待多次时,continuation 将 运行 一个接一个地同步(在同一个线程中)(假设没有同步上下文)。对于 GetOrCreateLazyAsync
的上述实现来说,这可能是一个问题,因为阻塞代码可能会在对 GetOrCreateLazyAsync
的等待调用之后立即存在,在这种情况下,其他等待者将受到影响(延迟,甚至陷入僵局)。此问题的可能解决方案是 return 延迟创建的 Task
的异步延续,而不是任务本身,但前提是任务未完成。这就是上面引入ToAsyncConditional
方法的原因。
注意:此实现缓存异步 lambda 调用期间可能发生的任何错误。一般来说,这可能不是理想的行为。
我可能的解决方案是用 AsyncLazy<T>
type from Stephen Cleary's Nito.AsyncEx.Coordination
包替换 Lazy<Task<T>>
,用 RetryOnFailure
选项实例化。
这个帖子有点旧,但我最近才看到这个,我想我会留下这个答案希望它能有所帮助。
对于异步,有几点需要牢记:
- “超级锁”方法并不快速,因为它会在对一个键执行操作时阻止对 其他 键的工厂操作。
- “按键锁定”(
SemaphoreSlim
) 有两件事:a。它是一次性的,因此可以在赛后处理。 b.忍受不处理它。
我选择使用锁池来解决。不需要每个键都有一个锁,但只要有足够的锁就可以达到最大活动线程数。然后我通过散列将相同的锁分配给密钥。池大小是 ProcessorCount
的函数。 valueFactory
只执行一次。由于多个键映射到一个锁(一个键总是映射到同一个锁),具有相同散列的键的操作将得到同步。所以这失去了一些并行性,并且这种折衷可能不适用于所有情况。我同意这种妥协。这是 LazyCache
和 FusionCache
(它的许多方法之一)使用的方法,等等。所以我会使用其中的一个,但知道这个技巧是件好事,因为它非常漂亮。
private readonly SemaphoreSlimPool _lockPool = new SemaphoreSlimPool(1, 1);
private async Task<TValue> GetAsync(object key, Func<ICacheEntry, Task<TValue>> valueFactory)
{
if (_cache.TryGetValue(key, out var value))
{
return value;
}
// key-specific lock so as to not block operations on other keys
var lockForKey = _lockPool[key];
await lockForKey.WaitAsync().ConfigureAwait(false);
try
{
if (_cache.TryGetValue(key, out value))
{
return value;
}
value = await _cache.GetOrCreateAsync(key, valueFactory).ConfigureAwait(false);
return value;
}
finally
{
lockForKey.Release();
}
}
// Dispose SemaphoreSlimPool
这是 SemaphoreSlimPool
实现 (source, nuget)。
/// <summary>
/// Provides a pool of SemaphoreSlim objects for keyed usage.
/// </summary>
public class SemaphoreSlimPool : IDisposable
{
/// <summary>
/// Pool of SemaphoreSlim objects.
/// </summary>
private readonly SemaphoreSlim[] pool;
/// <summary>
/// Size of the pool.
/// <para></para>
/// Environment.ProcessorCount is not always correct so use more slots as buffer,
/// with a minimum of 32 slots.
/// </summary>
private readonly int poolSize = Math.Max(Environment.ProcessorCount << 3, 32);
private const int NoMaximum = int.MaxValue;
/// <summary>
/// Ctor.
/// </summary>
public SemaphoreSlimPool(int initialCount)
: this(initialCount, NoMaximum)
{ }
/// <summary>
/// Ctor.
/// </summary>
public SemaphoreSlimPool(int initialCount, int maxCount)
{
pool = new SemaphoreSlim[poolSize];
for (int i = 0; i < poolSize; i++)
{
pool[i] = new SemaphoreSlim(initialCount, maxCount);
}
}
/// <inheritdoc cref="Get(object)" />
public SemaphoreSlim this[object key] => Get(key);
/// <summary>
/// Returns a <see cref="SemaphoreSlim"/> from the pool that the <paramref name="key"/> maps to.
/// </summary>
/// <exception cref="ArgumentNullException"></exception>
public SemaphoreSlim Get(object key)
{
_ = key ?? throw new ArgumentNullException(nameof(key));
return pool[GetIndex(key)];
}
private uint GetIndex(object key)
{
return unchecked((uint)key.GetHashCode()) % (uint)poolSize;
}
private bool disposed = false;
public void Dispose()
{
Dispose(true);
}
public void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (pool != null)
{
for (int i = 0; i < poolSize; i++)
{
pool[i].Dispose();
}
}
}
disposed = true;
}
}
}
由于 ttl 低,我已经在这上面投入了很多线程,并且有很多流失,而且它没有被炸毁。到目前为止,我觉得还不错,但我想看看是否有人能找到错误。
我创建了一个使用 .NET MemoryCache
的异步缓存。
这是代码:
public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if(parameters != null)
key += JsonConvert.SerializeObject(parameters);
if(!_cache.Contains(key))
{
var data = await populator();
lock(_cache)
{
if(!_cache.Contains(key)) //Check again but locked this time
_cache.Add(key, data, DateTimeOffset.Now.Add(expire));
}
}
return (T)_cache.Get(key);
}
我认为唯一的缺点是我需要在锁外进行等待,因此填充器不是线程安全的,但由于等待不能驻留在锁内,我想这是最好的方法。有没有我错过的陷阱?
更新:当另一个线程使缓存无效时,Esers 答案的一个版本也是线程安全的:
public async Task<T> GetAsync(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if(parameters != null)
key += JsonConvert.SerializeObject(parameters);
var lazy = new Lazy<Task<T>>(populator, true);
_cache.AddOrGetExisting(key, lazy, DateTimeOffset.Now.Add(expire));
return ((Lazy<Task<T>>) _cache.Get(key)).Value;
}
但是它可能会更慢,因为它创建了永远不会执行的 Lazy 实例,并且它在全线程安全模式下使用 Lazy LazyThreadSafetyMode.ExecutionAndPublication
更新新基准(越高越好)
Lazy with lock 42535929
Lazy with GetOrAdd 41070320 (Only solution that is completely thread safe)
Semaphore 64573360
一个简单的解决方案是使用 SemaphoreSlim.WaitAsync()
而不是锁,然后您就可以解决在锁内等待的问题。虽然,MemoryCache
的所有其他方法都是线程安全的。
private SemaphoreSlim semaphoreSlim = new SemaphoreSlim(1);
public async Task<T> GetAsync(
string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);
if (!_cache.Contains(key))
{
await semaphoreSlim.WaitAsync();
try
{
if (!_cache.Contains(key))
{
var data = await populator();
_cache.Add(key, data, DateTimeOffset.Now.Add(expire));
}
}
finally
{
semaphoreSlim.Release();
}
}
return (T)_cache.Get(key);
}
虽然已经有一个 接受的 答案,但我将 post 一个采用 Lazy<T>
方法的新答案。想法是:尽量减少lock
块的持续时间,如果缓存中不存在该键,则将Lazy<T>
放入缓存。这样所有同时使用同一个键的线程都将等待同一个Lazy<T>
的值
public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);
lock (_cache)
{
if (!_cache.Contains(key))
{
var lazy = new Lazy<Task<T>>(populator, true);
_cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
}
}
return ((Lazy<Task<T>>)_cache.Get(key)).Value;
}
版本 2
public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);
var lazy = ((Lazy<Task<T>>)_cache.Get(key));
if (lazy != null) return lazy.Value;
lock (_cache)
{
if (!_cache.Contains(key))
{
lazy = new Lazy<Task<T>>(populator, true);
_cache.Add(key, lazy, DateTimeOffset.Now.Add(expire));
return lazy.Value;
}
return ((Lazy<Task<T>>)_cache.Get(key)).Value;
}
}
版本 3
public Task<T> GetAsync<T>(string key, Func<Task<T>> populator, TimeSpan expire, object parameters)
{
if (parameters != null)
key += JsonConvert.SerializeObject(parameters);
var task = (Task<T>)_cache.Get(key);
if (task != null) return task;
var value = populator();
return
(Task<T>)_cache.AddOrGetExisting(key, value, DateTimeOffset.Now.Add(expire)) ?? value;
}
当前答案使用有些过时的System.Runtime.Caching.MemoryCache
。它们还包含微妙的竞争条件(见评论)。最后,并非所有这些都允许超时取决于要缓存的值。
这是我使用新 Microsoft.Extensions.Caching.Memory 的尝试(由 ASP.NET 核心使用):
//Add NuGet package: Microsoft.Extensions.Caching.Memory
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Primitives;
MemoryCache _cache = new MemoryCache(new MemoryCacheOptions());
public Task<T> GetOrAddAsync<T>(
string key, Func<Task<T>> factory, Func<T, TimeSpan> expirationCalculator)
{
return _cache.GetOrCreateAsync(key, async cacheEntry =>
{
var cts = new CancellationTokenSource();
cacheEntry.AddExpirationToken(new CancellationChangeToken(cts.Token));
var value = await factory().ConfigureAwait(false);
cts.CancelAfter(expirationCalculator(value));
return value;
});
}
示例用法:
await GetOrAddAsync("foo", () => Task.Run(() => 42), i => TimeSpan.FromMilliseconds(i)));
请注意,不能保证工厂方法只被调用一次(参见https://github.com/aspnet/Caching/issues/240)。
这是对 Eser 的尝试改进 Lazy
class 默认情况下是线程安全的,因此可以删除 lock
。有可能会为给定的键创建多个 Lazy
对象,但只有一个对象会被 Value
属性 查询,从而导致繁重的 Task
启动。其他 Lazy
将保持未使用状态,并且将超出范围并很快成为垃圾收集器。
第一个重载是灵活的通用重载,它接受一个 Func<CacheItemPolicy>
参数。我为最常见的绝对和滑动过期情况添加了两个重载。为了方便起见,可以添加更多重载。
using System.Runtime.Caching;
static partial class MemoryCacheExtensions
{
public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
Func<Task<T>> valueFactory, Func<CacheItemPolicy> cacheItemPolicyFactory = null)
{
var lazyTask = (Lazy<Task<T>>)cache.Get(key);
if (lazyTask == null)
{
var newLazyTask = new Lazy<Task<T>>(valueFactory);
var cacheItem = new CacheItem(key, newLazyTask);
var cacheItemPolicy = cacheItemPolicyFactory?.Invoke();
var existingCacheItem = cache.AddOrGetExisting(cacheItem, cacheItemPolicy);
lazyTask = (Lazy<Task<T>>)existingCacheItem?.Value ?? newLazyTask;
}
return ToAsyncConditional(lazyTask.Value);
}
private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
{
if (task.IsCompleted) return task;
return task.ContinueWith(t => t,
default, TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default).Unwrap();
}
public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
{
return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
{
AbsoluteExpiration = absoluteExpiration,
});
}
public static Task<T> GetOrCreateLazyAsync<T>(this MemoryCache cache, string key,
Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
{
return cache.GetOrCreateLazyAsync(key, valueFactory, () => new CacheItemPolicy()
{
SlidingExpiration = slidingExpiration,
});
}
}
用法示例:
string html = await MemoryCache.Default.GetOrCreateLazyAsync("MyKey", async () =>
{
return await new WebClient().DownloadStringTaskAsync("https://whosebug.com");
}, DateTimeOffset.Now.AddMinutes(10));
本站HTML下载并缓存10分钟。多个并发请求将await
同一个任务完成。
关于缓存优先级的 System.Runtime.Caching.MemoryCache
class is easy to use, but has limited support for prioritizing the cache entries. Basically there are only two options, Default
and NotRemovable
, meaning it's hardly adequate for advanced scenarios. The newer Microsoft.Extensions.Caching.Memory.MemoryCache
class (from this package) offers more options(Low
、Normal
、High
和 NeverRemove
),但在其他方面不太直观且使用起来更麻烦.它提供异步功能,但不是惰性的。所以这里是这个 class:
using Microsoft.Extensions.Caching.Memory;
static partial class MemoryCacheExtensions
{
public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
Func<Task<T>> valueFactory, MemoryCacheEntryOptions options = null)
{
if (!cache.TryGetValue(key, out Lazy<Task<T>> lazy))
{
var entry = cache.CreateEntry(key);
if (options != null) entry.SetOptions(options);
var newLazy = new Lazy<Task<T>>(valueFactory);
entry.Value = newLazy;
entry.Dispose(); // Dispose actually inserts the entry in the cache
if (!cache.TryGetValue(key, out lazy)) lazy = newLazy;
}
return ToAsyncConditional(lazy.Value);
}
private static Task<TResult> ToAsyncConditional<TResult>(Task<TResult> task)
{
if (task.IsCompleted) return task;
return task.ContinueWith(t => t,
default, TaskContinuationOptions.RunContinuationsAsynchronously,
TaskScheduler.Default).Unwrap();
}
public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
Func<Task<T>> valueFactory, DateTimeOffset absoluteExpiration)
{
return cache.GetOrCreateLazyAsync(key, valueFactory,
new MemoryCacheEntryOptions() { AbsoluteExpiration = absoluteExpiration });
}
public static Task<T> GetOrCreateLazyAsync<T>(this IMemoryCache cache, object key,
Func<Task<T>> valueFactory, TimeSpan slidingExpiration)
{
return cache.GetOrCreateLazyAsync(key, valueFactory,
new MemoryCacheEntryOptions() { SlidingExpiration = slidingExpiration });
}
}
用法示例:
var cache = new MemoryCache(new MemoryCacheOptions());
string html = await cache.GetOrCreateLazyAsync("MyKey", async () =>
{
return await new WebClient().DownloadStringTaskAsync("https://whosebug.com");
}, DateTimeOffset.Now.AddMinutes(10));
更新:我刚刚了解到 async
-await
机制的 Task
被并发等待多次时,continuation 将 运行 一个接一个地同步(在同一个线程中)(假设没有同步上下文)。对于 GetOrCreateLazyAsync
的上述实现来说,这可能是一个问题,因为阻塞代码可能会在对 GetOrCreateLazyAsync
的等待调用之后立即存在,在这种情况下,其他等待者将受到影响(延迟,甚至陷入僵局)。此问题的可能解决方案是 return 延迟创建的 Task
的异步延续,而不是任务本身,但前提是任务未完成。这就是上面引入ToAsyncConditional
方法的原因。
注意:此实现缓存异步 lambda 调用期间可能发生的任何错误。一般来说,这可能不是理想的行为。
我可能的解决方案是用 AsyncLazy<T>
type from Stephen Cleary's Nito.AsyncEx.Coordination
包替换 Lazy<Task<T>>
,用 RetryOnFailure
选项实例化。
这个帖子有点旧,但我最近才看到这个,我想我会留下这个答案希望它能有所帮助。
对于异步,有几点需要牢记:
- “超级锁”方法并不快速,因为它会在对一个键执行操作时阻止对 其他 键的工厂操作。
- “按键锁定”(
SemaphoreSlim
) 有两件事:a。它是一次性的,因此可以在赛后处理。 b.忍受不处理它。
我选择使用锁池来解决。不需要每个键都有一个锁,但只要有足够的锁就可以达到最大活动线程数。然后我通过散列将相同的锁分配给密钥。池大小是 ProcessorCount
的函数。 valueFactory
只执行一次。由于多个键映射到一个锁(一个键总是映射到同一个锁),具有相同散列的键的操作将得到同步。所以这失去了一些并行性,并且这种折衷可能不适用于所有情况。我同意这种妥协。这是 LazyCache
和 FusionCache
(它的许多方法之一)使用的方法,等等。所以我会使用其中的一个,但知道这个技巧是件好事,因为它非常漂亮。
private readonly SemaphoreSlimPool _lockPool = new SemaphoreSlimPool(1, 1);
private async Task<TValue> GetAsync(object key, Func<ICacheEntry, Task<TValue>> valueFactory)
{
if (_cache.TryGetValue(key, out var value))
{
return value;
}
// key-specific lock so as to not block operations on other keys
var lockForKey = _lockPool[key];
await lockForKey.WaitAsync().ConfigureAwait(false);
try
{
if (_cache.TryGetValue(key, out value))
{
return value;
}
value = await _cache.GetOrCreateAsync(key, valueFactory).ConfigureAwait(false);
return value;
}
finally
{
lockForKey.Release();
}
}
// Dispose SemaphoreSlimPool
这是 SemaphoreSlimPool
实现 (source, nuget)。
/// <summary>
/// Provides a pool of SemaphoreSlim objects for keyed usage.
/// </summary>
public class SemaphoreSlimPool : IDisposable
{
/// <summary>
/// Pool of SemaphoreSlim objects.
/// </summary>
private readonly SemaphoreSlim[] pool;
/// <summary>
/// Size of the pool.
/// <para></para>
/// Environment.ProcessorCount is not always correct so use more slots as buffer,
/// with a minimum of 32 slots.
/// </summary>
private readonly int poolSize = Math.Max(Environment.ProcessorCount << 3, 32);
private const int NoMaximum = int.MaxValue;
/// <summary>
/// Ctor.
/// </summary>
public SemaphoreSlimPool(int initialCount)
: this(initialCount, NoMaximum)
{ }
/// <summary>
/// Ctor.
/// </summary>
public SemaphoreSlimPool(int initialCount, int maxCount)
{
pool = new SemaphoreSlim[poolSize];
for (int i = 0; i < poolSize; i++)
{
pool[i] = new SemaphoreSlim(initialCount, maxCount);
}
}
/// <inheritdoc cref="Get(object)" />
public SemaphoreSlim this[object key] => Get(key);
/// <summary>
/// Returns a <see cref="SemaphoreSlim"/> from the pool that the <paramref name="key"/> maps to.
/// </summary>
/// <exception cref="ArgumentNullException"></exception>
public SemaphoreSlim Get(object key)
{
_ = key ?? throw new ArgumentNullException(nameof(key));
return pool[GetIndex(key)];
}
private uint GetIndex(object key)
{
return unchecked((uint)key.GetHashCode()) % (uint)poolSize;
}
private bool disposed = false;
public void Dispose()
{
Dispose(true);
}
public void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
if (pool != null)
{
for (int i = 0; i < poolSize; i++)
{
pool[i].Dispose();
}
}
}
disposed = true;
}
}
}
由于 ttl 低,我已经在这上面投入了很多线程,并且有很多流失,而且它没有被炸毁。到目前为止,我觉得还不错,但我想看看是否有人能找到错误。