如何处理 poolboy 中的超时?
How to handle timeouts in poolboy?
我有一个耗时较长的迁移问题,我希望并行 运行(它可以并行 运行ned)。迁移其实就是获取数据库中的所有记录,并对每条记录执行耗时耗资源的操作。
有时候个别记录迁移挂掉,所以我给10分钟完成。如果迁移没有完成,我希望它能正常关闭,没有任何异常(见下文)
我还使用 poolboy erlang 包来并行执行,因为迁移不仅消耗时间,还消耗资源。问题是当超时发生并且代码将要中断时,我不知道如何处理错误。我的监督树是:
defmodule MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2 do
use Ecto.Migration
alias MyReelty.Repo
alias MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.Migrator
# parallel nature of migration force us to disable transaction
@disable_ddl_transaction true
@migrator_waiting_time 10 * 60 * 1000 # timeout
@poolboy_waiting_time @migrator_waiting_time + 10 * 1000 # give a time for graceful shutdown
@pool_name :migrator
@pool_size 3
@pool_config [
{ :name, { :local, @pool_name }},
{ :worker_module, Migrator },
{ :size, @pool_size },
{ :max_overflow, 0 },
{ :strategy, :fifo }
]
def up do
children = [
:poolboy.child_spec(@pool_name, @pool_config)
]
opts = [strategy: :one_for_one, name: MyReelty.Supervisor]
Supervisor.start_link(children, opts)
rows = Review |> Repo.all
IO.puts "Total amount of reviews is: #{length(rows)}"
parallel_migrations(rows)
end
def parallel_migrations(rows) do
Enum.map(rows, fn(row) ->
pooled_migration(@pool_name, row)
end)
end
def pooled_migration(pool, x) do
:poolboy.transaction(
pool,
(fn(pid) -> Migrator.move(pid, { x, @migrator_waiting_time }) end),
@poolboy_waiting_time
)
end
defmodule Migrator do
alias MyReelty.Repo
alias MyReelty.Review
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil, [])
end
def move(server, { params, waiting_time }) do
GenServer.call(server, { :move, params }, waiting_time)
end
def handle_call({ :move, result }, _from, state) do
big_time_and_resource_consuming_task_here
{:reply, %{}, state}
end
end
end
数据库中某些记录的迁移时间超过 10 分钟的问题我有这种异常:
20:18:16.917 [error] Task #PID<0.282.0> started from #PID<0.70.0> terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
** (EXIT) time out
(elixir) lib/gen_server.ex:604: GenServer.call/3
(poolboy) src/poolboy.erl:76: :poolboy.transaction/3
(elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<5.53617785/0 in MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.parallel_migrations/1>
Args: []
20:18:16.918 [error] GenServer MyReelty.Repo terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
** (EXIT) time out
Last message: {:EXIT, #PID<0.70.0>, {:timeout, {GenServer, :call, [#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000]}}}
State: {:state, {:local, MyReelty.Repo}, :one_for_one, [{:child, #PID<0.231.0>, DBConnection.Poolboy, {:poolboy, :start_link, [[name: {:local, MyReelty.Repo.Pool}, strategy: :fifo, size: 1, max_overflow: 0, worker_module: DBConnection.Poolboy.Worker], {Postgrex.Protocol, [types: true, username: "adik", types: true, name: MyReelty.Repo.Pool, otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}, {Ecto.Adapters.Postgres.DateTime, []}, {Postgrex.Extensions.JSON, [library: Poison]}], pool_size: 1, pool_timeout: 5000, timeout: 15000, adapter: Ecto.Adapters.Postgres, database: "my_dev", hostname: "localhost", pool_size: 10, pool: DBConnection.Poolboy, port: 5432]}]}, :permanent, 5000, :worker, [:poolboy]}], :undefined, 3, 5, [], 0, Ecto.Repo.Supervisor, {MyReelty.Repo, :my_reelty, Ecto.Adapters.Postgres, [otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}], pool_size: 1]}}
我试图将terminate/2
或handle_info/2
插入到Migrator
并使用它,但我什至没有达到要调用的函数。我如何处理超时并防止它们破坏我的迁移?
已更新
我使用了@johlo 的提示,但我仍然超时。我的函数是:
def init(_) do
Process.flag(:trap_exit, true)
{:ok, %{}}
end
当 Migrator.move/2
(即 GenServer.call
)函数超时时,它将使整个 MoveVideosFromVimeoToB2
进程崩溃,因为这是进行 GenServer
调用的实际进程。
这里的解决方案是在pooled_migration
中的匿名函数中捕获超时,类似(我对Elixir语法不是很熟悉,所以它可能无法编译,但你应该明白) :
def pooled_migration(pool, x) do
:poolboy.transaction(
pool,
(fn(pid) ->
try do
Migrator.move(pid, { x, @migrator_waiting_time })
catch
:exit, reason ->
# Ignore error, log it or something else
:ok
end
end),
@poolboy_waiting_time
)
end
超时的不是 Migrator
进程,而是 GenServer
对 Migrator
的调用,我们需要 try-catch
那个。
另请注意,Migrator
进程并未终止,它仍然是 运行,请参阅 GenServer call documentation 中的 timeouts
部分。
更新:
正如 @asiniy 在评论中提到的,@poolboy_waiting_time
应该设置为 :infinity
,这样 poolboy.transaction
函数在等待空闲的 Migrator
工作进程时不会抛出超时错误。因为 Migrator
最终会退出,所以这是安全的。
我有一个耗时较长的迁移问题,我希望并行 运行(它可以并行 运行ned)。迁移其实就是获取数据库中的所有记录,并对每条记录执行耗时耗资源的操作。
有时候个别记录迁移挂掉,所以我给10分钟完成。如果迁移没有完成,我希望它能正常关闭,没有任何异常(见下文)
我还使用 poolboy erlang 包来并行执行,因为迁移不仅消耗时间,还消耗资源。问题是当超时发生并且代码将要中断时,我不知道如何处理错误。我的监督树是:
defmodule MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2 do
use Ecto.Migration
alias MyReelty.Repo
alias MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.Migrator
# parallel nature of migration force us to disable transaction
@disable_ddl_transaction true
@migrator_waiting_time 10 * 60 * 1000 # timeout
@poolboy_waiting_time @migrator_waiting_time + 10 * 1000 # give a time for graceful shutdown
@pool_name :migrator
@pool_size 3
@pool_config [
{ :name, { :local, @pool_name }},
{ :worker_module, Migrator },
{ :size, @pool_size },
{ :max_overflow, 0 },
{ :strategy, :fifo }
]
def up do
children = [
:poolboy.child_spec(@pool_name, @pool_config)
]
opts = [strategy: :one_for_one, name: MyReelty.Supervisor]
Supervisor.start_link(children, opts)
rows = Review |> Repo.all
IO.puts "Total amount of reviews is: #{length(rows)}"
parallel_migrations(rows)
end
def parallel_migrations(rows) do
Enum.map(rows, fn(row) ->
pooled_migration(@pool_name, row)
end)
end
def pooled_migration(pool, x) do
:poolboy.transaction(
pool,
(fn(pid) -> Migrator.move(pid, { x, @migrator_waiting_time }) end),
@poolboy_waiting_time
)
end
defmodule Migrator do
alias MyReelty.Repo
alias MyReelty.Review
use GenServer
def start_link(_) do
GenServer.start_link(__MODULE__, nil, [])
end
def move(server, { params, waiting_time }) do
GenServer.call(server, { :move, params }, waiting_time)
end
def handle_call({ :move, result }, _from, state) do
big_time_and_resource_consuming_task_here
{:reply, %{}, state}
end
end
end
数据库中某些记录的迁移时间超过 10 分钟的问题我有这种异常:
20:18:16.917 [error] Task #PID<0.282.0> started from #PID<0.70.0> terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
** (EXIT) time out
(elixir) lib/gen_server.ex:604: GenServer.call/3
(poolboy) src/poolboy.erl:76: :poolboy.transaction/3
(elixir) lib/task/supervised.ex:94: Task.Supervised.do_apply/2
(elixir) lib/task/supervised.ex:45: Task.Supervised.reply/5
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Function: #Function<5.53617785/0 in MyReelty.Repo.Migrations.MoveVideosFromVimeoToB2.parallel_migrations/1>
Args: []
20:18:16.918 [error] GenServer MyReelty.Repo terminating
** (stop) exited in: GenServer.call(#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000)
** (EXIT) time out
Last message: {:EXIT, #PID<0.70.0>, {:timeout, {GenServer, :call, [#PID<0.278.0>, {:move, [2, "/videos/164064419", "w 35th st Springfield United States Illinois 60020"]}, 60000]}}}
State: {:state, {:local, MyReelty.Repo}, :one_for_one, [{:child, #PID<0.231.0>, DBConnection.Poolboy, {:poolboy, :start_link, [[name: {:local, MyReelty.Repo.Pool}, strategy: :fifo, size: 1, max_overflow: 0, worker_module: DBConnection.Poolboy.Worker], {Postgrex.Protocol, [types: true, username: "adik", types: true, name: MyReelty.Repo.Pool, otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}, {Ecto.Adapters.Postgres.DateTime, []}, {Postgrex.Extensions.JSON, [library: Poison]}], pool_size: 1, pool_timeout: 5000, timeout: 15000, adapter: Ecto.Adapters.Postgres, database: "my_dev", hostname: "localhost", pool_size: 10, pool: DBConnection.Poolboy, port: 5432]}]}, :permanent, 5000, :worker, [:poolboy]}], :undefined, 3, 5, [], 0, Ecto.Repo.Supervisor, {MyReelty.Repo, :my_reelty, Ecto.Adapters.Postgres, [otp_app: :my_reelty, repo: MyReelty.Repo, adapter: Ecto.Adapters.Postgres, database: "my_reelty_dev", hostname: "localhost", extensions: [{Geo.PostGIS.Extension, [library: Geo]}], pool_size: 1]}}
我试图将terminate/2
或handle_info/2
插入到Migrator
并使用它,但我什至没有达到要调用的函数。我如何处理超时并防止它们破坏我的迁移?
已更新
我使用了@johlo 的提示,但我仍然超时。我的函数是:
def init(_) do
Process.flag(:trap_exit, true)
{:ok, %{}}
end
当 Migrator.move/2
(即 GenServer.call
)函数超时时,它将使整个 MoveVideosFromVimeoToB2
进程崩溃,因为这是进行 GenServer
调用的实际进程。
这里的解决方案是在pooled_migration
中的匿名函数中捕获超时,类似(我对Elixir语法不是很熟悉,所以它可能无法编译,但你应该明白) :
def pooled_migration(pool, x) do
:poolboy.transaction(
pool,
(fn(pid) ->
try do
Migrator.move(pid, { x, @migrator_waiting_time })
catch
:exit, reason ->
# Ignore error, log it or something else
:ok
end
end),
@poolboy_waiting_time
)
end
超时的不是 Migrator
进程,而是 GenServer
对 Migrator
的调用,我们需要 try-catch
那个。
另请注意,Migrator
进程并未终止,它仍然是 运行,请参阅 GenServer call documentation 中的 timeouts
部分。
更新:
正如 @asiniy 在评论中提到的,@poolboy_waiting_time
应该设置为 :infinity
,这样 poolboy.transaction
函数在等待空闲的 Migrator
工作进程时不会抛出超时错误。因为 Migrator
最终会退出,所以这是安全的。