开始获取,在获取过程中对中间请求进行排队,然后为所有的服务提供数据
Start fetch, queue intermediate requests during fetch, then serve data for all
我在使用 Elixir 和 Phoenix 实现以下流程时遇到问题:
- 来自用户 A 的请求,第 3 方 API 缓存为空
- 启动第 3 方API 通过 HTTP 获取
- 获取尚未完成,来自用户 B 的请求进来了
- 用户 B 等待 获取完成
- 获取完成,将获取的数据写入缓存(例如 Redis)
- 使用缓存数据为所有等待的用户提供服务
不同的路由或路由参数应该使用不同的队列。当第 3 方 API 数据仍在获取时出现的请求在任何情况下都不应触发具有相同参数的额外获取。等待部分(2.2.)对我来说很重要。
根据我目前的阅读,这个问题似乎可以使用标准的 Elixir / Erlang / OTP 功能来解决。
是的,与大多数其他语言相比,Elixir/Erlang 可以很容易地做到这一点。这是使用内存缓存执行此操作的一种方法。如果您以前使用过 GenServer 但没有使用过 GenServer.reply/2
,这里要注意的是我们存储传入 handle_call
请求的 from
参数,当请求完成时,我们响应每个他们中的。我在这个 POC 代码中没有很好地处理错误,但它正确地处理了最有趣的部分,即 2.2:
defmodule CachedParallelHTTP do
def start_link do
GenServer.start_link(__MODULE__, :ok)
end
def init(_) do
{:ok, %{}}
end
def handle_call({:fetch, arg}, from, state) do
case state[arg] do
%{status: :fetched, response: response} ->
# We've already made this request; just return the cached response.
{:reply, response, state}
%{status: :fetching} ->
# We're currently running this request. Store the `from` and reply to the caller later.
state = update_in(state, [arg, :froms], fn froms -> [from | froms] end)
{:noreply, state}
nil ->
# This is a brand new request. Let's create the new state and start the request.
pid = self()
state = Map.put(state, arg, %{status: :fetching, froms: [from]})
Task.start(fn ->
IO.inspect {:making_request, arg}
# Simulate a long synchronous piece of code. The actual HTTP call should be made here.
Process.sleep(2000)
# dummy response
response = arg <> arg <> arg
# Let the server know that this request is done so it can reply to all the `froms`,
# including the ones that were added while this request was being executed.
GenServer.call(pid, {:fetched, arg, response})
end)
{:noreply, state}
end
end
def handle_call({:fetched, arg, response}, _from, state) do
# A request was completed.
case state[arg] do
%{status: :fetching, froms: froms} ->
IO.inspect "notifying #{length(froms)} clients waiting for #{arg}"
# Reply to all the callers who've been waiting for this request.
for from <- froms do
GenServer.reply(from, response)
end
# Cache the response in the state, for future callers.
state = Map.put(state, arg, %{status: :fetched, response: response})
{:reply, :ok, state}
end
end
end
这里有一小段代码可以对此进行测试:
now = fn -> DateTime.utc_now |> DateTime.to_iso8601 end
{:ok, s} = CachedParallelHTTP.start_link
IO.inspect {:before_request, now.()}
for i <- 1..3 do
Task.start(fn ->
response = GenServer.call(s, {:fetch, "123"})
IO.inspect {:response, "123", i, now.(), response}
end)
end
:timer.sleep(1000)
for i <- 1..5 do
Task.start(fn ->
response = GenServer.call(s, {:fetch, "456"})
IO.inspect {:response, "456", i, now.(), response}
end)
end
IO.inspect {:after_request, now.()}
:timer.sleep(10000)
输出:
{:before_request, "2017-01-06T10:30:07.852986Z"}
{:making_request, "123"}
{:after_request, "2017-01-06T10:30:08.862425Z"}
{:making_request, "456"}
"notifying 3 clients waiting for 123"
{:response, "123", 3, "2017-01-06T10:30:07.860758Z", "123123123"}
{:response, "123", 2, "2017-01-06T10:30:07.860747Z", "123123123"}
{:response, "123", 1, "2017-01-06T10:30:07.860721Z", "123123123"}
"notifying 5 clients waiting for 456"
{:response, "456", 5, "2017-01-06T10:30:08.862556Z", "456456456"}
{:response, "456", 4, "2017-01-06T10:30:08.862540Z", "456456456"}
{:response, "456", 3, "2017-01-06T10:30:08.862524Z", "456456456"}
{:response, "456", 2, "2017-01-06T10:30:08.862504Z", "456456456"}
{:response, "456", 1, "2017-01-06T10:30:08.862472Z", "456456456"}
请注意,使用 GenServer.reply
和 Task.start
,单个 GenServer 能够处理多个并行请求,同时保持面向 API 的用户完全同步。根据您要处理的负载量,您可能需要考虑使用 GenServers 池。
我在使用 Elixir 和 Phoenix 实现以下流程时遇到问题:
- 来自用户 A 的请求,第 3 方 API 缓存为空
- 启动第 3 方API 通过 HTTP 获取
- 获取尚未完成,来自用户 B 的请求进来了
- 用户 B 等待 获取完成
- 获取完成,将获取的数据写入缓存(例如 Redis)
- 使用缓存数据为所有等待的用户提供服务
不同的路由或路由参数应该使用不同的队列。当第 3 方 API 数据仍在获取时出现的请求在任何情况下都不应触发具有相同参数的额外获取。等待部分(2.2.)对我来说很重要。
根据我目前的阅读,这个问题似乎可以使用标准的 Elixir / Erlang / OTP 功能来解决。
是的,与大多数其他语言相比,Elixir/Erlang 可以很容易地做到这一点。这是使用内存缓存执行此操作的一种方法。如果您以前使用过 GenServer 但没有使用过 GenServer.reply/2
,这里要注意的是我们存储传入 handle_call
请求的 from
参数,当请求完成时,我们响应每个他们中的。我在这个 POC 代码中没有很好地处理错误,但它正确地处理了最有趣的部分,即 2.2:
defmodule CachedParallelHTTP do
def start_link do
GenServer.start_link(__MODULE__, :ok)
end
def init(_) do
{:ok, %{}}
end
def handle_call({:fetch, arg}, from, state) do
case state[arg] do
%{status: :fetched, response: response} ->
# We've already made this request; just return the cached response.
{:reply, response, state}
%{status: :fetching} ->
# We're currently running this request. Store the `from` and reply to the caller later.
state = update_in(state, [arg, :froms], fn froms -> [from | froms] end)
{:noreply, state}
nil ->
# This is a brand new request. Let's create the new state and start the request.
pid = self()
state = Map.put(state, arg, %{status: :fetching, froms: [from]})
Task.start(fn ->
IO.inspect {:making_request, arg}
# Simulate a long synchronous piece of code. The actual HTTP call should be made here.
Process.sleep(2000)
# dummy response
response = arg <> arg <> arg
# Let the server know that this request is done so it can reply to all the `froms`,
# including the ones that were added while this request was being executed.
GenServer.call(pid, {:fetched, arg, response})
end)
{:noreply, state}
end
end
def handle_call({:fetched, arg, response}, _from, state) do
# A request was completed.
case state[arg] do
%{status: :fetching, froms: froms} ->
IO.inspect "notifying #{length(froms)} clients waiting for #{arg}"
# Reply to all the callers who've been waiting for this request.
for from <- froms do
GenServer.reply(from, response)
end
# Cache the response in the state, for future callers.
state = Map.put(state, arg, %{status: :fetched, response: response})
{:reply, :ok, state}
end
end
end
这里有一小段代码可以对此进行测试:
now = fn -> DateTime.utc_now |> DateTime.to_iso8601 end
{:ok, s} = CachedParallelHTTP.start_link
IO.inspect {:before_request, now.()}
for i <- 1..3 do
Task.start(fn ->
response = GenServer.call(s, {:fetch, "123"})
IO.inspect {:response, "123", i, now.(), response}
end)
end
:timer.sleep(1000)
for i <- 1..5 do
Task.start(fn ->
response = GenServer.call(s, {:fetch, "456"})
IO.inspect {:response, "456", i, now.(), response}
end)
end
IO.inspect {:after_request, now.()}
:timer.sleep(10000)
输出:
{:before_request, "2017-01-06T10:30:07.852986Z"}
{:making_request, "123"}
{:after_request, "2017-01-06T10:30:08.862425Z"}
{:making_request, "456"}
"notifying 3 clients waiting for 123"
{:response, "123", 3, "2017-01-06T10:30:07.860758Z", "123123123"}
{:response, "123", 2, "2017-01-06T10:30:07.860747Z", "123123123"}
{:response, "123", 1, "2017-01-06T10:30:07.860721Z", "123123123"}
"notifying 5 clients waiting for 456"
{:response, "456", 5, "2017-01-06T10:30:08.862556Z", "456456456"}
{:response, "456", 4, "2017-01-06T10:30:08.862540Z", "456456456"}
{:response, "456", 3, "2017-01-06T10:30:08.862524Z", "456456456"}
{:response, "456", 2, "2017-01-06T10:30:08.862504Z", "456456456"}
{:response, "456", 1, "2017-01-06T10:30:08.862472Z", "456456456"}
请注意,使用 GenServer.reply
和 Task.start
,单个 GenServer 能够处理多个并行请求,同时保持面向 API 的用户完全同步。根据您要处理的负载量,您可能需要考虑使用 GenServers 池。