搜索 IReliableDictionary 的最佳方法是什么?
What would be the best way to search an IReliableDictionary?
基于这些帖子:
Convert IReliableDictionary to IList
What is the most optimal method of querying a reliable dictionary collection
我应该可以使用 Linq 来查询 IReliableDictionary,但似乎该接口不再实现 IEnumerable,并且 Linq 扩展不可用。至少在 Microsoft.ServiceFabric.Data.Interfaces 程序集的版本 5.0.0.0 中。
如果这是真的,那么搜索 IReliableDictionary 的最佳方法是什么?
据我所知,Service Fabric 可靠字典可能看起来像普通的 .NET 字典,但实际上并不相同。因此,SF 可靠字典或队列可能不支持某些方法。
是的,我们确实在 GA 版本中从 Reliable Collections 中删除了 IEnumerable。正如 Allan T 所提到的,可靠集合实际上与常规 .NET 集合并不相同,尽管它们代表相同的基本数据结构。您可能已经注意到的最大差异之一是所有操作都是异步的,因为锁定语义和 I/O 用于复制和磁盘读取。正是后者促使我们做出删除 IEnumerable 的决定,因为它是严格同步的,而我们不是。相反,我们现在使用 IAsyncEnumerable,它目前还不支持整套 LINQ 扩展方法。
我们正在研究异步 LINQ 扩展方法,但与此同时,有多种方法可以使用 IAsyncEnumerable。
Eli Arbel has async extension methods on Gist that provide a bridge to System.Interactive.Async 以及 Select、SelectMany 和 Where.
的异步实现
或者您可以 wrap IAsyncEnumerable in a regular IEnumerable 简单地用同步方法包装异步调用,这将再次为您提供完整的 LINQ 扩展方法集。然后你可以在常规 LINQ 查询中使用扩展方法:
using (ITransaction tx = this.StateManager.CreateTransaction())
{
var x = from item in (await clusterDictionary.CreateEnumerableAsync(tx)).ToEnumerable()
where item.Value.Status == ClusterStatus.Ready
orderby item.Value.CreatedOn descending
select new ClusterView(
item.Key,
item.Value.AppCount,
item.Value.ServiceCount,
item.Value.Users.Count(),
this.config.MaximumUsersPerCluster,
this.config.MaximumClusterUptime - (DateTimeOffset.UtcNow - item.Value.CreatedOn.ToUniversalTime()));
}
不知道是不是"best"的方式,我一直在用下面的
public static async Task<IList<KeyValuePair<Guid, T>>> QueryReliableDictionary<T>(IReliableStateManager stateManager, string collectionName, Func<T, bool> filter)
{
var result = new List<KeyValuePair<Guid, T>>();
IReliableDictionary<Guid, T> reliableDictionary =
await stateManager.GetOrAddAsync<IReliableDictionary<Guid, T>>(collectionName);
using (ITransaction tx = stateManager.CreateTransaction())
{
IAsyncEnumerable<KeyValuePair<Guid, T>> asyncEnumerable = await reliableDictionary.CreateEnumerableAsync(tx);
using (IAsyncEnumerator<KeyValuePair<Guid, T>> asyncEnumerator = asyncEnumerable.GetAsyncEnumerator())
{
while (await asyncEnumerator.MoveNextAsync(CancellationToken.None))
{
if (filter(asyncEnumerator.Current.Value))
result.Add(asyncEnumerator.Current);
}
}
}
return result;
}
您可以通过传入 StateManager、您要查询的集合的名称以及带有查询逻辑的 lambda 函数来使用该方法。例如:
var queryResult = await QueryReliableDictionary<string>(this.StateManager, "CustomerCollection", name => !string.IsNullOrWhiteSpace(name) && (name.IndexOf("fred", StringComparison.OrdinalIgnoreCase) >= 0));
在撰写此答案时,IAsyncEnumerable
是 dotnet 库的一部分,C# 8.0 添加语法糖来支持它。
问题是 ServiceFabric 使用其自己的 IAsyncEnumerable
定义,因此您不能对其应用 dotnet 扩展方法和 C# 助手。
我想到的一个解决方案是使用简单的包装器将 ServiceFabric 的 IAsyncEnumerable
转换为本机的:
using System.Threading;
using System.Threading.Tasks;
using Generic = System.Collections.Generic;
using Fabric = Microsoft.ServiceFabric.Data;
public static class FabricAsyncEnumerableExtensions
{
/// <summary>
/// Converts ServiceFabric <see cref="Microsoft.ServiceFabric.Data.IAsyncEnumerable"/> to dotnet native <see cref="System.Collections.Generic.IAsyncEnumerable"/>.
/// </summary>
public static Generic.IAsyncEnumerable<T> ToGeneric<T>(this Fabric.IAsyncEnumerable<T> source) =>
new AsyncEnumerableWrapper<T>(source);
private class AsyncEnumerableWrapper<T> : Generic.IAsyncEnumerable<T>
{
private readonly Fabric.IAsyncEnumerable<T> _source;
public AsyncEnumerableWrapper(Fabric.IAsyncEnumerable<T> source) => _source = source;
public Generic.IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken _ = default)
{
Fabric.IAsyncEnumerator<T> enumerator = _source.GetAsyncEnumerator();
return new AsyncEnumeratorWrapper<T>(enumerator);
}
}
private class AsyncEnumeratorWrapper<T> : Generic.IAsyncEnumerator<T>
{
private readonly Fabric.IAsyncEnumerator<T> _source;
public AsyncEnumeratorWrapper(Fabric.IAsyncEnumerator<T> source) => _source = source;
public async ValueTask DisposeAsync() =>
await Task.Run(_source.Dispose).ConfigureAwait(false);
public async ValueTask<bool> MoveNextAsync() =>
await _source.MoveNextAsync(default).ConfigureAwait(false);
public T Current => _source.Current;
}
}
使用它变得像调用扩展方法一样简单,然后像常规一样使用它 IAsyncEnumerable
:
Fabric.IAsyncEnumerable<KeyValuePair<Guid, Product>> asyncProducts = GetAsyncProducts();
return await asyncProducts.ToGeneric()
.Select(p => p.Value)
.ToListAsync()
.ConfigureAwait(false);
基于这些帖子:
Convert IReliableDictionary to IList
What is the most optimal method of querying a reliable dictionary collection
我应该可以使用 Linq 来查询 IReliableDictionary,但似乎该接口不再实现 IEnumerable,并且 Linq 扩展不可用。至少在 Microsoft.ServiceFabric.Data.Interfaces 程序集的版本 5.0.0.0 中。
如果这是真的,那么搜索 IReliableDictionary 的最佳方法是什么?
据我所知,Service Fabric 可靠字典可能看起来像普通的 .NET 字典,但实际上并不相同。因此,SF 可靠字典或队列可能不支持某些方法。
是的,我们确实在 GA 版本中从 Reliable Collections 中删除了 IEnumerable。正如 Allan T 所提到的,可靠集合实际上与常规 .NET 集合并不相同,尽管它们代表相同的基本数据结构。您可能已经注意到的最大差异之一是所有操作都是异步的,因为锁定语义和 I/O 用于复制和磁盘读取。正是后者促使我们做出删除 IEnumerable 的决定,因为它是严格同步的,而我们不是。相反,我们现在使用 IAsyncEnumerable,它目前还不支持整套 LINQ 扩展方法。
我们正在研究异步 LINQ 扩展方法,但与此同时,有多种方法可以使用 IAsyncEnumerable。
Eli Arbel has async extension methods on Gist that provide a bridge to System.Interactive.Async 以及 Select、SelectMany 和 Where.
的异步实现或者您可以 wrap IAsyncEnumerable in a regular IEnumerable 简单地用同步方法包装异步调用,这将再次为您提供完整的 LINQ 扩展方法集。然后你可以在常规 LINQ 查询中使用扩展方法:
using (ITransaction tx = this.StateManager.CreateTransaction())
{
var x = from item in (await clusterDictionary.CreateEnumerableAsync(tx)).ToEnumerable()
where item.Value.Status == ClusterStatus.Ready
orderby item.Value.CreatedOn descending
select new ClusterView(
item.Key,
item.Value.AppCount,
item.Value.ServiceCount,
item.Value.Users.Count(),
this.config.MaximumUsersPerCluster,
this.config.MaximumClusterUptime - (DateTimeOffset.UtcNow - item.Value.CreatedOn.ToUniversalTime()));
}
不知道是不是"best"的方式,我一直在用下面的
public static async Task<IList<KeyValuePair<Guid, T>>> QueryReliableDictionary<T>(IReliableStateManager stateManager, string collectionName, Func<T, bool> filter)
{
var result = new List<KeyValuePair<Guid, T>>();
IReliableDictionary<Guid, T> reliableDictionary =
await stateManager.GetOrAddAsync<IReliableDictionary<Guid, T>>(collectionName);
using (ITransaction tx = stateManager.CreateTransaction())
{
IAsyncEnumerable<KeyValuePair<Guid, T>> asyncEnumerable = await reliableDictionary.CreateEnumerableAsync(tx);
using (IAsyncEnumerator<KeyValuePair<Guid, T>> asyncEnumerator = asyncEnumerable.GetAsyncEnumerator())
{
while (await asyncEnumerator.MoveNextAsync(CancellationToken.None))
{
if (filter(asyncEnumerator.Current.Value))
result.Add(asyncEnumerator.Current);
}
}
}
return result;
}
您可以通过传入 StateManager、您要查询的集合的名称以及带有查询逻辑的 lambda 函数来使用该方法。例如:
var queryResult = await QueryReliableDictionary<string>(this.StateManager, "CustomerCollection", name => !string.IsNullOrWhiteSpace(name) && (name.IndexOf("fred", StringComparison.OrdinalIgnoreCase) >= 0));
在撰写此答案时,IAsyncEnumerable
是 dotnet 库的一部分,C# 8.0 添加语法糖来支持它。
问题是 ServiceFabric 使用其自己的 IAsyncEnumerable
定义,因此您不能对其应用 dotnet 扩展方法和 C# 助手。
我想到的一个解决方案是使用简单的包装器将 ServiceFabric 的 IAsyncEnumerable
转换为本机的:
using System.Threading;
using System.Threading.Tasks;
using Generic = System.Collections.Generic;
using Fabric = Microsoft.ServiceFabric.Data;
public static class FabricAsyncEnumerableExtensions
{
/// <summary>
/// Converts ServiceFabric <see cref="Microsoft.ServiceFabric.Data.IAsyncEnumerable"/> to dotnet native <see cref="System.Collections.Generic.IAsyncEnumerable"/>.
/// </summary>
public static Generic.IAsyncEnumerable<T> ToGeneric<T>(this Fabric.IAsyncEnumerable<T> source) =>
new AsyncEnumerableWrapper<T>(source);
private class AsyncEnumerableWrapper<T> : Generic.IAsyncEnumerable<T>
{
private readonly Fabric.IAsyncEnumerable<T> _source;
public AsyncEnumerableWrapper(Fabric.IAsyncEnumerable<T> source) => _source = source;
public Generic.IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken _ = default)
{
Fabric.IAsyncEnumerator<T> enumerator = _source.GetAsyncEnumerator();
return new AsyncEnumeratorWrapper<T>(enumerator);
}
}
private class AsyncEnumeratorWrapper<T> : Generic.IAsyncEnumerator<T>
{
private readonly Fabric.IAsyncEnumerator<T> _source;
public AsyncEnumeratorWrapper(Fabric.IAsyncEnumerator<T> source) => _source = source;
public async ValueTask DisposeAsync() =>
await Task.Run(_source.Dispose).ConfigureAwait(false);
public async ValueTask<bool> MoveNextAsync() =>
await _source.MoveNextAsync(default).ConfigureAwait(false);
public T Current => _source.Current;
}
}
使用它变得像调用扩展方法一样简单,然后像常规一样使用它 IAsyncEnumerable
:
Fabric.IAsyncEnumerable<KeyValuePair<Guid, Product>> asyncProducts = GetAsyncProducts();
return await asyncProducts.ToGeneric()
.Select(p => p.Value)
.ToListAsync()
.ConfigureAwait(false);