C# LanguageExt - 将多个异步调用组合成一个分组调用
C# LanguageExt - combine multiple async calls into one grouped call
我有一个从数据存储中异步查找项目的方法;
class MyThing {}
Task<Try<MyThing>> GetThing(int thingId) {...}
我想从数据存储中查找多个项目,并编写了一个新方法来执行此操作。我还编写了一个辅助方法,它将采用多个 Try<T>
并将它们的结果组合成一个 Try<IEnumerable<T>>
.
public static class TryExtensions
{
Try<IEnumerable<T>> Collapse<T>(this IEnumerable<Try<T>> items)
{
var failures = items.Fails().ToArray();
return failures.Any() ?
Try<IEnumerable<T>>(new AggregateException(failures)) :
Try(items.Select(i => i.Succ(a => a).Fail(Enumerable.Empty<T>())));
}
}
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var results = new List<Try<Things>>();
foreach (var id in ids)
{
var thing = await GetThing(id);
results.Add(thing);
}
return results.Collapse().Map(p => p.ToArray());
}
另一种方法是这样的;
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var tasks = ids.Select(async id => await GetThing(id)).ToArray();
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}
问题是所有任务都将 运行 并行进行,我不想用大量并行请求来破坏我的数据存储。我真正想要的是使用 monadic 原则和 LanguageExt
的特性使我的代码具有功能性。有谁知道如何做到这一点?
更新
感谢@MatthewWatson 的建议,这就是 SemaphoreSlim
;
的样子
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var mutex = new SemaphoreSlim(1);
var results = ids.Select(async id =>
{
await mutex.WaitAsync();
try { return await GetThing(id); }
finally { mutex.Release(); }
}).ToArray();
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).Collapse().Map(Enumerable.ToArray);
return results.Collapse().Map(p => p.ToArray());
}
问题是,这仍然不是一元化/函数式的,最终代码行数比我原来的代码多了一个 foreach
块。
在“另一种方式”中,您几乎达到了目标,当您调用:
var tasks = ids.Select(async id => await GetThing(id)).ToArray();
除了 Tasks 不会 运行 顺序 所以你最终会遇到许多查询访问你的数据存储,这是由 .ToArray()
和 Task.WhenAll
。一旦你调用 .ToArray()
它已经分配并启动了任务,所以如果你可以“容忍”一个 foreach
来实现顺序任务 运行ning,就像这样:
public static class TaskExtensions
{
public static async Task RunSequentially<T>(this IEnumerable<Task<T>> tasks)
{
foreach (var task in tasks) await task;
}
}
Despite that running a "loop" of queries is not a quite good practice
in general, unless you have in some background service and some
special scenario, leveraging this to the Database engine through
WHERE thingId IN (...)
in general is a better option. Even you
have big amount of thingIds we can slice it into small 10s, 100s.. to
narrow the WHERE IN
footprint.
回到我们的 RunSequentially
,我 会 想让它更实用,例如:
tasks.ToList().ForEach(async task => await task);
但遗憾的是,这仍然 运行 有点“并行”任务。
所以最终的用法应该是:
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var tasks = ids.Select(id => GetThing(id));// remember don't use .ToArray or ToList...
await tasks.RunSequentially();
return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}
另一个矫枉过正的功能解决方案是在 Queue 递归 中获得 Lazy !!
而不是 GetThing
,只需将 GetThing
:
包裹起来,就可以得到 returns Lazy<Task<Try<MyThing>>>
的 Lazy GetLazyThing
new Lazy<Task<Try<MyThing>>>(() => GetThing(id))
现在使用一对extensions/functions:
public static async Task RecRunSequentially<T>(this IEnumerable<Lazy<Task<T>>> tasks)
{
var queue = tasks.EnqueueAll();
await RunQueue(queue);
}
public static Queue<T> EnqueueAll<T>(this IEnumerable<T> list)
{
var queue = new Queue<T>();
list.ToList().ForEach(m => queue.Enqueue(m));
return queue;
}
public static async Task RunQueue<T>(Queue<Lazy<Task<T>>> queue)
{
if (queue.Count > 0)
{
var task = queue.Dequeue();
await task.Value; // this unwraps the Lazy object content
await RunQueue(queue);
}
}
最后:
var lazyTasks = ids.Select(id => GetLazyThing(id));
await lazyTasks.RecRunSequentially();
// Now collapse and map as you like
更新
然而,如果您不喜欢 EnqueueAll
和 RunQueue
不是“纯粹”的事实,我们可以采用相同的 Lazy
技巧[=34] =]
public static async Task AwaitSequentially<T>(this Lazy<Task<T>>[] array, int index = 0)
{
if (array == null || index < 0 || index >= array.Length - 1) return;
await array[index].Value;
await AwaitSequentially(array, index + 1); // ++index is not pure :)
}
现在:
var lazyTasks = ids.Select(id => GetLazyThing(id));
await tasks.ToArray().AwaitSequentially();
// Now collapse and map as you like
我有一个从数据存储中异步查找项目的方法;
class MyThing {}
Task<Try<MyThing>> GetThing(int thingId) {...}
我想从数据存储中查找多个项目,并编写了一个新方法来执行此操作。我还编写了一个辅助方法,它将采用多个 Try<T>
并将它们的结果组合成一个 Try<IEnumerable<T>>
.
public static class TryExtensions
{
Try<IEnumerable<T>> Collapse<T>(this IEnumerable<Try<T>> items)
{
var failures = items.Fails().ToArray();
return failures.Any() ?
Try<IEnumerable<T>>(new AggregateException(failures)) :
Try(items.Select(i => i.Succ(a => a).Fail(Enumerable.Empty<T>())));
}
}
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var results = new List<Try<Things>>();
foreach (var id in ids)
{
var thing = await GetThing(id);
results.Add(thing);
}
return results.Collapse().Map(p => p.ToArray());
}
另一种方法是这样的;
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var tasks = ids.Select(async id => await GetThing(id)).ToArray();
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}
问题是所有任务都将 运行 并行进行,我不想用大量并行请求来破坏我的数据存储。我真正想要的是使用 monadic 原则和 LanguageExt
的特性使我的代码具有功能性。有谁知道如何做到这一点?
更新
感谢@MatthewWatson 的建议,这就是 SemaphoreSlim
;
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var mutex = new SemaphoreSlim(1);
var results = ids.Select(async id =>
{
await mutex.WaitAsync();
try { return await GetThing(id); }
finally { mutex.Release(); }
}).ToArray();
await Task.WhenAll(tasks);
return tasks.Select(t => t.Result).Collapse().Map(Enumerable.ToArray);
return results.Collapse().Map(p => p.ToArray());
}
问题是,这仍然不是一元化/函数式的,最终代码行数比我原来的代码多了一个 foreach
块。
在“另一种方式”中,您几乎达到了目标,当您调用:
var tasks = ids.Select(async id => await GetThing(id)).ToArray();
除了 Tasks 不会 运行 顺序 所以你最终会遇到许多查询访问你的数据存储,这是由 .ToArray()
和 Task.WhenAll
。一旦你调用 .ToArray()
它已经分配并启动了任务,所以如果你可以“容忍”一个 foreach
来实现顺序任务 运行ning,就像这样:
public static class TaskExtensions
{
public static async Task RunSequentially<T>(this IEnumerable<Task<T>> tasks)
{
foreach (var task in tasks) await task;
}
}
Despite that running a "loop" of queries is not a quite good practice in general, unless you have in some background service and some special scenario, leveraging this to the Database engine through
WHERE thingId IN (...)
in general is a better option. Even you have big amount of thingIds we can slice it into small 10s, 100s.. to narrow theWHERE IN
footprint.
回到我们的 RunSequentially
,我 会 想让它更实用,例如:
tasks.ToList().ForEach(async task => await task);
但遗憾的是,这仍然 运行 有点“并行”任务。
所以最终的用法应该是:
async Task<Try<MyThing[]>> GetThings(IEnumerable<string> ids)
{
var tasks = ids.Select(id => GetThing(id));// remember don't use .ToArray or ToList...
await tasks.RunSequentially();
return tasks.Select(t => t.Result).Collapse().Map(p => p.ToArray());
}
另一个矫枉过正的功能解决方案是在 Queue 递归 中获得 Lazy !!
而不是 GetThing
,只需将 GetThing
:
Lazy<Task<Try<MyThing>>>
的 Lazy GetLazyThing
new Lazy<Task<Try<MyThing>>>(() => GetThing(id))
现在使用一对extensions/functions:
public static async Task RecRunSequentially<T>(this IEnumerable<Lazy<Task<T>>> tasks)
{
var queue = tasks.EnqueueAll();
await RunQueue(queue);
}
public static Queue<T> EnqueueAll<T>(this IEnumerable<T> list)
{
var queue = new Queue<T>();
list.ToList().ForEach(m => queue.Enqueue(m));
return queue;
}
public static async Task RunQueue<T>(Queue<Lazy<Task<T>>> queue)
{
if (queue.Count > 0)
{
var task = queue.Dequeue();
await task.Value; // this unwraps the Lazy object content
await RunQueue(queue);
}
}
最后:
var lazyTasks = ids.Select(id => GetLazyThing(id));
await lazyTasks.RecRunSequentially();
// Now collapse and map as you like
更新
然而,如果您不喜欢 EnqueueAll
和 RunQueue
不是“纯粹”的事实,我们可以采用相同的 Lazy
技巧[=34] =]
public static async Task AwaitSequentially<T>(this Lazy<Task<T>>[] array, int index = 0)
{
if (array == null || index < 0 || index >= array.Length - 1) return;
await array[index].Value;
await AwaitSequentially(array, index + 1); // ++index is not pure :)
}
现在:
var lazyTasks = ids.Select(id => GetLazyThing(id));
await tasks.ToArray().AwaitSequentially();
// Now collapse and map as you like