Elixir 中的同步障碍?
Synchronization barrier in Elixir?
Elixir 最优雅的(循环)屏障实现是什么?要实现的算法(顶点着色)有一个循环,其中有一个等待阶段等待生成的进程("execute ... synchronously in parallel",然后使用所有进程的结果检查终止条件),它是来自 Principles of 的算法 5“6-color”分布式计算,Ch。 1.
大多数参考资料都是针对 .NET、pthreads 和其他 thread-related 计算的,所以我不确定屏障是否是我所追求的正确模式。可能还有更多"Elixirish"方式。
我还没有任何代码(正在搜索模式),但这是实现相同问题的 "slow" 版本的代码:https://codereview.stackexchange.com/questions/111487/coloring-trees-in-elixir
我的想法是让 top-level 进程(每个图节点生成一个进程)发送和接收消息,这将同步节点进程。必须提到的是,节点进程也相互通信:parents 在一个 top-level 循环迭代期间向 children 发送消息。然而,复杂的是,在 top-level 收到节点的消息之后和所有节点进行迭代之前,任何进程都不应该继续(尽管我很可能会使用尾递归)。这就是我想到屏障机制的原因。
我不确定这是否正是您要找的,但这是一个基于 java.util.concurrent.CyclicBarrier class in java and the Concurrent::CyclicBarrier class in ruby.
的循环障碍
defmodule CyclicBarrier do
require Record
Record.defrecordp :barrier, CyclicBarrier,
pid: nil
def start(parties, action \ nil)
when (is_integer(parties) and parties > 0)
and (action === nil or is_function(action, 0)),
do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action]))
def stop(barrier(pid: pid)) do
call(pid, :stop)
true
end
def alive?(barrier(pid: pid)) do
Process.alive?(pid)
end
def broken?(barrier(pid: pid)) do
case call(pid, :status) do
:waiting ->
false
_ ->
true
end
end
def number_waiting(barrier(pid: pid)) do
case call(pid, :number_waiting) do
n when is_integer(n) ->
n
_ ->
false
end
end
def parties(barrier(pid: pid)) do
case call(pid, :parties) do
n when is_integer(n) ->
n
_ ->
false
end
end
def reset(barrier(pid: pid)) do
case call(pid, :reset) do
:reset ->
true
:broken ->
true
_ ->
false
end
end
def wait(barrier = barrier()),
do: wait(nil, barrier)
def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do
case call(pid, :wait, timeout) do
:fulfilled ->
true
:broken ->
false
:timeout ->
reset(barrier)
false
_ ->
false
end
end
defp call(pid, request, timeout \ nil) do
case Process.alive?(pid) do
false ->
{:EXIT, pid, :normal}
true ->
trap_exit = Process.flag(:trap_exit, true)
Process.link(pid)
ref = make_ref()
send(pid, {ref, self(), request})
case timeout do
nil ->
receive do
{^ref, reply} ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
reply
exited = {:EXIT, ^pid, _} ->
Process.flag(:trap_exit, trap_exit)
exited
end
_ ->
receive do
{^ref, reply} ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
reply
exited = {:EXIT, ^pid, _} ->
Process.flag(:trap_exit, trap_exit)
exited
after
timeout ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
:timeout
end
end
end
end
defmodule Server do
require Record
Record.defrecordp :state_data,
waiting: 0,
parties: nil,
action: nil,
q: :queue.new()
def init(parties, action),
do: loop(:waiting, state_data(parties: parties, action: action))
defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)),
do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new()))
defp loop(state_name, sd) do
receive do
{ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) ->
handle(state_name, request, {ref, pid}, sd)
end
end
defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)),
do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q)))
defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)),
do: loop(done(:reset, nil, :queue.in(from, q)), sd)
defp handle(:waiting, :reset, from, sd = state_data(q: q)),
do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new()))
defp handle(:broken, :reset, from, sd = state_data(q: q)),
do: loop(done(:reset, nil, :queue.in(from, q)), sd)
defp handle(:broken, :wait, from, sd) do
cast(from, :broken)
loop(:broken, sd)
end
defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do
cast(from, number_waiting)
loop(state_name, sd)
end
defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do
cast(from, parties)
loop(state_name, sd)
end
defp handle(state_name, :status, from, sd) do
cast(from, state_name)
loop(state_name, sd)
end
defp handle(_state_name, :stop, _from, _sd) do
exit(:normal)
end
defp broadcast(q, message),
do: for from <- :queue.to_list(q),
do: cast(from, message)
defp cast({ref, pid}, message),
do: send(pid, {ref, message})
defp done(state, action, q, continue \ true) do
run(action)
broadcast(q, state)
case continue do
true ->
:waiting
false ->
state
end
end
defp run(nil),
do: nil
defp run(action),
do: action.()
end
end
这里有一个在 Elixir 的 IEx shell 中使用 CyclicBarrier
的例子:
iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end)
{CyclicBarrier, #PID<0.281.0>}
iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end)
done
process 5: true
process 1: true
process 3: true
process 2: true
process 4: true
[#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>]
进程执行的确切顺序是不确定的。
CyclicBarrier
上的其他函数示例如下:
iex> barrier = CyclicBarrier.start(2)
{CyclicBarrier, #PID<0.280.0>}
iex> barrier.alive?
true
iex> barrier.broken?
false
iex> barrier.number_waiting
0
iex> barrier.parties
2
iex> # let's spawn another process which will wait on the barrier
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.288.0>
iex> barrier.number_waiting
1
iex> # if we reset the barrier while another process is waiting
iex> # on the barrier, it will break
iex> barrier.reset
barrier returned: false
true
iex> barrier.broken?
true
iex> # however, the barrier can be reset again to its initial state
iex> barrier.reset
true
iex> barrier.broken?
false
iex> # if a timeout is exceeded while waiting for a barrier, it
iex> # will also break the barrier
iex> barrier.wait(100)
false
iex> barrier.broken?
true
iex> # let's reset the barrier, spawn another process to wait,
iex> # and wait with a timeout in the current process
iex> barrier.reset
true
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.289.0>
iex> barrier.wait(100)
barrier returned: true
true
iex> # if stop is called on the barrier, the barrier process will
iex> # exit and all future calls to the barrier will return false
iex> barrier.stop
true
iex> barrier.alive?
false
iex> barrier.reset
false
iex> barrier.wait
false
我已经在 Hex 上发布了一个库,sync_primitives, that does exactly this! Source code and extensive unit tests (100% test coverage) on GitHub。
看一看,让我知道我是否可以改进它!
Elixir 最优雅的(循环)屏障实现是什么?要实现的算法(顶点着色)有一个循环,其中有一个等待阶段等待生成的进程("execute ... synchronously in parallel",然后使用所有进程的结果检查终止条件),它是来自 Principles of 的算法 5“6-color”分布式计算,Ch。 1.
大多数参考资料都是针对 .NET、pthreads 和其他 thread-related 计算的,所以我不确定屏障是否是我所追求的正确模式。可能还有更多"Elixirish"方式。
我还没有任何代码(正在搜索模式),但这是实现相同问题的 "slow" 版本的代码:https://codereview.stackexchange.com/questions/111487/coloring-trees-in-elixir
我的想法是让 top-level 进程(每个图节点生成一个进程)发送和接收消息,这将同步节点进程。必须提到的是,节点进程也相互通信:parents 在一个 top-level 循环迭代期间向 children 发送消息。然而,复杂的是,在 top-level 收到节点的消息之后和所有节点进行迭代之前,任何进程都不应该继续(尽管我很可能会使用尾递归)。这就是我想到屏障机制的原因。
我不确定这是否正是您要找的,但这是一个基于 java.util.concurrent.CyclicBarrier class in java and the Concurrent::CyclicBarrier class in ruby.
的循环障碍defmodule CyclicBarrier do
require Record
Record.defrecordp :barrier, CyclicBarrier,
pid: nil
def start(parties, action \ nil)
when (is_integer(parties) and parties > 0)
and (action === nil or is_function(action, 0)),
do: barrier(pid: spawn(CyclicBarrier.Server, :init, [parties, action]))
def stop(barrier(pid: pid)) do
call(pid, :stop)
true
end
def alive?(barrier(pid: pid)) do
Process.alive?(pid)
end
def broken?(barrier(pid: pid)) do
case call(pid, :status) do
:waiting ->
false
_ ->
true
end
end
def number_waiting(barrier(pid: pid)) do
case call(pid, :number_waiting) do
n when is_integer(n) ->
n
_ ->
false
end
end
def parties(barrier(pid: pid)) do
case call(pid, :parties) do
n when is_integer(n) ->
n
_ ->
false
end
end
def reset(barrier(pid: pid)) do
case call(pid, :reset) do
:reset ->
true
:broken ->
true
_ ->
false
end
end
def wait(barrier = barrier()),
do: wait(nil, barrier)
def wait(timeout, barrier = barrier(pid: pid)) when timeout === nil or is_integer(timeout) do
case call(pid, :wait, timeout) do
:fulfilled ->
true
:broken ->
false
:timeout ->
reset(barrier)
false
_ ->
false
end
end
defp call(pid, request, timeout \ nil) do
case Process.alive?(pid) do
false ->
{:EXIT, pid, :normal}
true ->
trap_exit = Process.flag(:trap_exit, true)
Process.link(pid)
ref = make_ref()
send(pid, {ref, self(), request})
case timeout do
nil ->
receive do
{^ref, reply} ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
reply
exited = {:EXIT, ^pid, _} ->
Process.flag(:trap_exit, trap_exit)
exited
end
_ ->
receive do
{^ref, reply} ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
reply
exited = {:EXIT, ^pid, _} ->
Process.flag(:trap_exit, trap_exit)
exited
after
timeout ->
Process.unlink(pid)
Process.flag(:trap_exit, trap_exit)
:timeout
end
end
end
end
defmodule Server do
require Record
Record.defrecordp :state_data,
waiting: 0,
parties: nil,
action: nil,
q: :queue.new()
def init(parties, action),
do: loop(:waiting, state_data(parties: parties, action: action))
defp loop(:waiting, sd = state_data(waiting: same, parties: same, action: action, q: q)),
do: loop(done(:fulfilled, action, q), state_data(sd, waiting: 0, q: :queue.new()))
defp loop(state_name, sd) do
receive do
{ref, pid, request} when is_reference(ref) and is_pid(pid) and is_atom(request) ->
handle(state_name, request, {ref, pid}, sd)
end
end
defp handle(:waiting, :wait, from, sd = state_data(waiting: w, q: q)),
do: loop(:waiting, state_data(sd, waiting: w + 1, q: :queue.in(from, q)))
defp handle(:waiting, :reset, from, sd = state_data(waiting: 0, q: q)),
do: loop(done(:reset, nil, :queue.in(from, q)), sd)
defp handle(:waiting, :reset, from, sd = state_data(q: q)),
do: loop(done(:broken, nil, :queue.in(from, q), false), state_data(sd, waiting: 0, q: :queue.new()))
defp handle(:broken, :reset, from, sd = state_data(q: q)),
do: loop(done(:reset, nil, :queue.in(from, q)), sd)
defp handle(:broken, :wait, from, sd) do
cast(from, :broken)
loop(:broken, sd)
end
defp handle(state_name, :number_waiting, from, sd = state_data(waiting: number_waiting)) do
cast(from, number_waiting)
loop(state_name, sd)
end
defp handle(state_name, :parties, from, sd = state_data(parties: parties)) do
cast(from, parties)
loop(state_name, sd)
end
defp handle(state_name, :status, from, sd) do
cast(from, state_name)
loop(state_name, sd)
end
defp handle(_state_name, :stop, _from, _sd) do
exit(:normal)
end
defp broadcast(q, message),
do: for from <- :queue.to_list(q),
do: cast(from, message)
defp cast({ref, pid}, message),
do: send(pid, {ref, message})
defp done(state, action, q, continue \ true) do
run(action)
broadcast(q, state)
case continue do
true ->
:waiting
false ->
state
end
end
defp run(nil),
do: nil
defp run(action),
do: action.()
end
end
这里有一个在 Elixir 的 IEx shell 中使用 CyclicBarrier
的例子:
iex> barrier = CyclicBarrier.start(5, fn -> IO.puts("done") end)
{CyclicBarrier, #PID<0.281.0>}
iex> for i <- 1..5, do: spawn(fn -> IO.puts("process #{i}: #{barrier.wait}") end)
done
process 5: true
process 1: true
process 3: true
process 2: true
process 4: true
[#PID<0.283.0>, #PID<0.284.0>, #PID<0.285.0>, #PID<0.286.0>, #PID<0.287.0>]
进程执行的确切顺序是不确定的。
CyclicBarrier
上的其他函数示例如下:
iex> barrier = CyclicBarrier.start(2)
{CyclicBarrier, #PID<0.280.0>}
iex> barrier.alive?
true
iex> barrier.broken?
false
iex> barrier.number_waiting
0
iex> barrier.parties
2
iex> # let's spawn another process which will wait on the barrier
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.288.0>
iex> barrier.number_waiting
1
iex> # if we reset the barrier while another process is waiting
iex> # on the barrier, it will break
iex> barrier.reset
barrier returned: false
true
iex> barrier.broken?
true
iex> # however, the barrier can be reset again to its initial state
iex> barrier.reset
true
iex> barrier.broken?
false
iex> # if a timeout is exceeded while waiting for a barrier, it
iex> # will also break the barrier
iex> barrier.wait(100)
false
iex> barrier.broken?
true
iex> # let's reset the barrier, spawn another process to wait,
iex> # and wait with a timeout in the current process
iex> barrier.reset
true
iex> spawn(fn -> IO.puts("barrier returned: #{barrier.wait}") end)
#PID<0.289.0>
iex> barrier.wait(100)
barrier returned: true
true
iex> # if stop is called on the barrier, the barrier process will
iex> # exit and all future calls to the barrier will return false
iex> barrier.stop
true
iex> barrier.alive?
false
iex> barrier.reset
false
iex> barrier.wait
false
我已经在 Hex 上发布了一个库,sync_primitives, that does exactly this! Source code and extensive unit tests (100% test coverage) on GitHub。
看一看,让我知道我是否可以改进它!