可枚举的 foreach 扩展
Enumerable foreach extend
我创建了一个 Enumerable 的扩展来快速执行操作,所以我列出并在这个方法中,我循环并且如果对象在特定时间内执行该方法我 return,
现在我想使输出通用,因为方法输出会有所不同,关于如何做的任何建议
这个IEnumerable的进程,好像是负载均衡,如果第一个没有响应第二个应该,我想return输入Action的输出
public static class EnumerableExtensions
{
public static void ForEach<T>(this IEnumerable<T> source, Action action, int timeOut)
{
foreach (T element in source)
{
lock (source)
{
// Loop for all connections and get the fastest responsive proxy
foreach (var mxAccessProxy in source)
{
try
{
// check for the health
Task executionTask = Task.Run(action);
if (executionTask.Wait(timeOut))
{
return ;
}
}
catch
{
//ignore
}
}
}
}
}
}
此代码 运行 喜欢
_proxies.ForEach(certainaction, timeOut);
this will enhance the performance and code readability
不,它绝对不会 :) 此外,您为这段代码带来了更多问题,例如冗余锁定或异常吞噬,但实际上并没有并行执行代码。
您似乎想使用某种代理对象以最快的速度调用您的 Action
。您需要异步 运行 Tasks
,而不是 .Wait()
。
类似的内容可能对您有所帮助:
public static class TaskExtensions
{
public static TReturn ParallelSelectReturnFastest<TPoolObject, TReturn>(this TPoolObject[] pool,
Func<TPoolObject, CancellationToken, TReturn> func,
int? timeout = null)
{
var ctx = new CancellationTokenSource();
// for every object in pool schedule a task
Task<TReturn>[] tasks = pool
.Select(poolObject =>
{
ctx.Token.ThrowIfCancellationRequested();
return Task.Factory.StartNew(() => func(poolObject, ctx.Token), ctx.Token);
})
.ToArray();
// not sure if Cast is actually needed,
// just to get rid of co-variant array conversion
int firstCompletedIndex = timeout.HasValue
? Task.WaitAny(tasks.Cast<Task>().ToArray(), timeout.Value, ctx.Token)
: Task.WaitAny(tasks.Cast<Task>().ToArray(), ctx.Token);
// we need to cancel token to avoid unnecessary work to be done
ctx.Cancel();
if (firstCompletedIndex == -1) // no objects in pool managed to complete action in time
throw new NotImplementedException(); // custom exception goes here
return tasks[firstCompletedIndex].Result;
}
}
现在,您可以使用此扩展方法对任何对象池调用特定操作并获取第一个执行结果:
var pool = new[] { 1, 2, 3, 4, 5 };
var result = pool.ParallelSelectReturnFastest((x, token) => {
Thread.Sleep(x * 200);
token.ThrowIfCancellationRequested();
Console.WriteLine("calculate");
return x * x;
}, 100);
Console.WriteLine(result);
它输出:
calculate
1
因为第一个任务将在 200 毫秒内完成工作,return 它和所有其他任务将通过取消令牌取消。
在您的情况下,它将类似于:
var actionResponse = proxiesList.ParallelSelectReturnFastest((proxy, token) => {
token.ThrowIfCancellationRequested();
return proxy.SomeAction();
});
有些事情要提一下:
- 确保您的行为是安全的。您不能依赖其中有多少会实际执行您的操作。如果此操作是
CreateItem
,那么您最终可以通过不同的代理创建许多项目
- 它不能保证你会运行并行执行所有这些操作,因为选择运行宁任务
的最佳数量取决于TPL
- 我已经用老式的 TPL 方式实现了,因为你原来的问题包含它。如果可能,您需要切换到 async/await - 在这种情况下,您的
Func
将执行 return 任务,您需要使用 await Task.WhenAny(tasks)
而不是 Task.WaitAny()
我创建了一个 Enumerable 的扩展来快速执行操作,所以我列出并在这个方法中,我循环并且如果对象在特定时间内执行该方法我 return, 现在我想使输出通用,因为方法输出会有所不同,关于如何做的任何建议
这个IEnumerable的进程,好像是负载均衡,如果第一个没有响应第二个应该,我想return输入Action的输出
public static class EnumerableExtensions
{
public static void ForEach<T>(this IEnumerable<T> source, Action action, int timeOut)
{
foreach (T element in source)
{
lock (source)
{
// Loop for all connections and get the fastest responsive proxy
foreach (var mxAccessProxy in source)
{
try
{
// check for the health
Task executionTask = Task.Run(action);
if (executionTask.Wait(timeOut))
{
return ;
}
}
catch
{
//ignore
}
}
}
}
}
}
此代码 运行 喜欢
_proxies.ForEach(certainaction, timeOut);
this will enhance the performance and code readability
不,它绝对不会 :) 此外,您为这段代码带来了更多问题,例如冗余锁定或异常吞噬,但实际上并没有并行执行代码。
您似乎想使用某种代理对象以最快的速度调用您的 Action
。您需要异步 运行 Tasks
,而不是 .Wait()
。
类似的内容可能对您有所帮助:
public static class TaskExtensions
{
public static TReturn ParallelSelectReturnFastest<TPoolObject, TReturn>(this TPoolObject[] pool,
Func<TPoolObject, CancellationToken, TReturn> func,
int? timeout = null)
{
var ctx = new CancellationTokenSource();
// for every object in pool schedule a task
Task<TReturn>[] tasks = pool
.Select(poolObject =>
{
ctx.Token.ThrowIfCancellationRequested();
return Task.Factory.StartNew(() => func(poolObject, ctx.Token), ctx.Token);
})
.ToArray();
// not sure if Cast is actually needed,
// just to get rid of co-variant array conversion
int firstCompletedIndex = timeout.HasValue
? Task.WaitAny(tasks.Cast<Task>().ToArray(), timeout.Value, ctx.Token)
: Task.WaitAny(tasks.Cast<Task>().ToArray(), ctx.Token);
// we need to cancel token to avoid unnecessary work to be done
ctx.Cancel();
if (firstCompletedIndex == -1) // no objects in pool managed to complete action in time
throw new NotImplementedException(); // custom exception goes here
return tasks[firstCompletedIndex].Result;
}
}
现在,您可以使用此扩展方法对任何对象池调用特定操作并获取第一个执行结果:
var pool = new[] { 1, 2, 3, 4, 5 };
var result = pool.ParallelSelectReturnFastest((x, token) => {
Thread.Sleep(x * 200);
token.ThrowIfCancellationRequested();
Console.WriteLine("calculate");
return x * x;
}, 100);
Console.WriteLine(result);
它输出:
calculate 1
因为第一个任务将在 200 毫秒内完成工作,return 它和所有其他任务将通过取消令牌取消。
在您的情况下,它将类似于:
var actionResponse = proxiesList.ParallelSelectReturnFastest((proxy, token) => {
token.ThrowIfCancellationRequested();
return proxy.SomeAction();
});
有些事情要提一下:
- 确保您的行为是安全的。您不能依赖其中有多少会实际执行您的操作。如果此操作是
CreateItem
,那么您最终可以通过不同的代理创建许多项目 - 它不能保证你会运行并行执行所有这些操作,因为选择运行宁任务 的最佳数量取决于TPL
- 我已经用老式的 TPL 方式实现了,因为你原来的问题包含它。如果可能,您需要切换到 async/await - 在这种情况下,您的
Func
将执行 return 任务,您需要使用await Task.WhenAny(tasks)
而不是Task.WaitAny()