在 Concurrent Ruby 中链接一组任务
Chain an array of tasks in Concurrent Ruby
我有一组任务,我想在某个后台线程中按顺序执行,每个任务的结果都传递给下一个,如果链中的任何 link 失败,链就会失败.
为了论证,假设每个任务都是一个具有 exec
方法的对象,returns 一个值,尽管它们同样可以是 procs 或 lambdas。
我现在拥有的是:
promise = array_of_tasks.inject(nil) do |promise, task|
if promise
promise.then { |prev_result| task.exec(prev_result) }
else
Concurrent::Promise.new { task.exec }
end
end
promise.on_success { |last_result| log.info("Success: #{last_result} ")}
promise.rescue { |reason| log.error("Failure: #{reason}")}
在 Promise
API 或 Concurrent Ruby 的其他地方,是否有更简洁的方法来做到这一点?这似乎是一个相当基本的操作,但我没有看到执行此操作的现有方法。
(旁注:如果没有这样的方法,那么在 futures-and-promises 世界中是否有这种模式的众所周知的名称?即,如果我自己编写该方法,是否有一些现有的显而易见的名字?)
它并不短,但这种结构可能更容易添加新功能:
require 'concurrent'
class Task
def exec(x = 0)
sleep 0.1
p x + 1
end
alias call exec
def to_promise(*params)
Concurrent::Promise.new { exec(*params) }
end
end
module PromiseChains
refine Concurrent::Promise do
def chained_thens(callables)
callables.inject(self) do |promise, callable|
promise.then do |prev_result|
callable.call(prev_result)
end
end
end
end
end
可以这样使用:
using PromiseChains
array_of_tasks = Array.new(10) { Task.new }
array_of_tasks << ->(x) { p x * 2 }
array_of_tasks << proc { |x| p x * 3 }
first_task, *other_tasks = array_of_tasks
chain = first_task.to_promise.chained_thens(other_tasks)
chain.on_success { |last_result| puts "Success: #{last_result} " }
chain.rescue { |reason| puts "Failure: #{reason}" }
chain.execute
sleep(2)
它输出:
1
2
3
4
5
6
7
8
9
10
20
60
Success: 60
我有一组任务,我想在某个后台线程中按顺序执行,每个任务的结果都传递给下一个,如果链中的任何 link 失败,链就会失败.
为了论证,假设每个任务都是一个具有 exec
方法的对象,returns 一个值,尽管它们同样可以是 procs 或 lambdas。
我现在拥有的是:
promise = array_of_tasks.inject(nil) do |promise, task|
if promise
promise.then { |prev_result| task.exec(prev_result) }
else
Concurrent::Promise.new { task.exec }
end
end
promise.on_success { |last_result| log.info("Success: #{last_result} ")}
promise.rescue { |reason| log.error("Failure: #{reason}")}
在 Promise
API 或 Concurrent Ruby 的其他地方,是否有更简洁的方法来做到这一点?这似乎是一个相当基本的操作,但我没有看到执行此操作的现有方法。
(旁注:如果没有这样的方法,那么在 futures-and-promises 世界中是否有这种模式的众所周知的名称?即,如果我自己编写该方法,是否有一些现有的显而易见的名字?)
它并不短,但这种结构可能更容易添加新功能:
require 'concurrent'
class Task
def exec(x = 0)
sleep 0.1
p x + 1
end
alias call exec
def to_promise(*params)
Concurrent::Promise.new { exec(*params) }
end
end
module PromiseChains
refine Concurrent::Promise do
def chained_thens(callables)
callables.inject(self) do |promise, callable|
promise.then do |prev_result|
callable.call(prev_result)
end
end
end
end
end
可以这样使用:
using PromiseChains
array_of_tasks = Array.new(10) { Task.new }
array_of_tasks << ->(x) { p x * 2 }
array_of_tasks << proc { |x| p x * 3 }
first_task, *other_tasks = array_of_tasks
chain = first_task.to_promise.chained_thens(other_tasks)
chain.on_success { |last_result| puts "Success: #{last_result} " }
chain.rescue { |reason| puts "Failure: #{reason}" }
chain.execute
sleep(2)
它输出:
1
2
3
4
5
6
7
8
9
10
20
60
Success: 60