如何在 erlang gen_server 中有效地使用 receive 子句来解决超时错误?
How to use efficiently receive clause in erlang gen_server to resolve timeout error?
有时我的循环 returns 正常,因为超时如何以正确的方式编写此代码。当超时时,它只是 returns 好,但不是我假设的实际值。在句柄调用中,我在 loop() 函数中调用一个函数 loop() 我正在接收一条带有 receive 子句的消息。现在,无论数据是否已成功保存,我都使用 loop2 函数 returns 将此数据发送到我的数据库,并将响应返回给 loop()。但是如果超时我的循环函数 returns 可以但不是实际值。
% @Author: ZEESHAN AHMAD
% @Date: 2020-12-22 05:06:12
% @Last Modified by: ZEESHAN AHMAD
% @Last Modified time: 2021-01-10 04:42:59
-module(getAccDataCons).
-behaviour(gen_server).
-include_lib("deps/amqp_client/include/amqp_client.hrl").
-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3,
terminate/2]).
-export([get_account/0]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:cast(?MODULE, stop).
get_account() ->
gen_server:call(?MODULE, {get_account}).
init(_Args) ->
{ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}),
{ok, Channel} = amqp_connection:open_channel(Connection),
{ok, Channel}.
handle_call({get_account}, _From, State) ->
amqp_channel:call(State, #'exchange.declare'{exchange = <<"get">>, type = <<"topic">>}),
amqp_channel:call(State, #'queue.declare'{queue = <<"get_account">>}),
Binding =
#'queue.bind'{exchange = <<"get">>,
routing_key = <<"get.account">>,
queue = <<"get_account">>},
#'queue.bind_ok'{} = amqp_channel:call(State, Binding),
io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),
amqp_channel:call(State,#'basic.consume'{queue = <<"get_account">>, no_ack = true}),
Returned =loop(),
io:format("~nReti=~p",[Returned]),
{reply, Returned, State};
handle_call(Message, _From, State) ->
io:format("received other handle_call message: ~p~n", [Message]),
{reply, ok, State}.
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(Message, State) ->
io:format("received other handle_cast call : ~p~n", [Message]),
{noreply, State}.
handle_info(Message, State) ->
io:format("received handle_info message : ~p~n", [Message]),
{noreply, State}.
code_change(_OldVer, State, _Extra) ->
{ok, State}.
terminate(Reason, _State) ->
io:format("server is terminating with reason :~p~n", [Reason]).
loop()->
receive
#'basic.consume_ok'{} -> ok
end,
receive
{#'basic.deliver'{}, Msg} ->
#amqp_msg{payload = Payload} = Msg,
Value=loop2(Payload),
Value
after 2000->
io:format("Server timeout")
end.
loop2(Payload)->
Result = jiffy:decode(Payload),
{[{<<"account_id">>, AccountId}]} = Result,
Doc = {[{<<"account_id">>, AccountId}]},
getAccDataDb:create_AccountId_view(),
Returned=case getAccDataDb:getAccountNameDetails(Doc) of
success ->
Respo = getAccDataDb:getAccountNameDetails1(Doc),
Respo;
details_not_matched ->
user_not_exist
end,
Returned.
没有loop
和loop2
代码,很难给出答案,如果这两个函数之一检测到超时,你必须先改变它们的行为以避免任何超时,或将其增加到有效的值。如果超时是必要的,那么确保 return 值是明确的湿它发生,例如 {error,RequestRef,timeout}
而不是 ok
.
尽管如此 gen_server
不应该等待太久的答案,你可以修改你的代码做:
而不是在客户端进程中使用 gen_server:call(ServerRef,Request)
,您可以使用:
RequestId = send_request(ServerRef, Request),
Result = wait_response(RequestId, Timeout),
并删除 loop
and/or loop2
中的超时。这样做你可以控制客户端的超时,你甚至可以将它设置为无穷大(不是一个好主意!)。
或者您可以将函数分成两部分
gen_server:cast(ServerRef,{Request,RequestRef}),
% this will not wait for any answer, RequestRef is a tag to identify later
% if the request was fulfilled, you can use make_ref() to generate it
及以后,或在另一个客户端进程中(这需要至少将 RequestRef
传递给此进程)检查请求的结果:
Answer = gen_server:call(ServerRef,{get_answer,RequestRef}),
case Answer of
no_reply -> ... % no answer yet
{ok,Reply} -> ... % handle the answer
end,
最后,您必须修改循环代码以处理 RequestRef
,将消息(再次使用 gen_server:cast
)与结果和 RequestRef
一起发送回服务器,然后存储这导致服务器状态。
我认为第二个解决方案没有价值,因为它与第一个解决方案或多或少相同,但是是手工制作的,它让您可以管理许多可能结束的错误情况(例如客户端死亡)变成一种内存泄漏。
编辑太长了,我把它放在新的答案里了。
超时时收到ok
的原因在loop()
代码中。在第二个接收块中,2000 毫秒后,您 return
在 io:format/1
语句之后。
io:format
returns ok
这就是您在 Returned 变量中得到的。您应该使用
更改此代码
loop()->
ok = receive
#'basic.consume_ok'{} -> ok
end,
receive
{#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> {ok,loop2(Payload)}
after 2000 ->
io:format("Server timeout"),
{error,timeout}
end.
使用此代码,您的客户将收到 {ok,Value}
或 {error,timeout}
,并且能够做出相应的反应。
但是这个版本还有问题:
- 2 秒超时可能太短,您缺少有效答案
- 当您在接收块中使用模式匹配并且不检查每个 amqp_channel:call
的结果时,可能会出现许多不同的问题并显示为超时
首先让我们看一下超时。有可能对 amqp_channel
的 4 次调用总共需要超过 2 秒才能成功完成。简单的解决方案是增加超时,将 after 2000
更改为 after 3000
或更多。
但是你会遇到两个问题:
- 您的 gen_server 一直处于阻塞状态,如果不专用于单个客户端,将无法用于
在等待答案时处理任何其他请求。
- 如果您需要将超时增加到 5 秒以上,您将遇到另一个超时,由 gen_server 内部管理:请求必须在 5 秒内得到答复。
gen_server提供了一些接口函数来解决此类问题:'send_request'、'wait_response'和回复。这是一个基本的
gen_server 可以处理 3 种请求:
- stop ... 停止服务器,对更新代码很有用。
- {blocking,Time,Value} 服务器将在 Time ms 结束期间休眠,然后 return 值。这模拟了你的情况,你可以调整如何
需要很长时间才能得到答案。
- {non_blocking,Time,Value} 服务器会将作业委托给另一个进程并且 return 立即没有应答(因此
它可用于另一个请求)。新进程将在 Time ms end 然后 return Value using gen_server:reply.
期间休眠
服务器模块实现了几个用户界面:
- 标准的start(), stop()
- blocking(Time,Value) 使用 gen_server:call
通过请求 {blocking,Time,Value} 调用服务器
- blocking_catch(Time,Value) 与上一个相同,但捕获 gen_server:call 的结果以显示隐藏的超时
- non_blocking(Time,Value,Wait) 使用 gen_server:send_request 使用请求 {non_blocking,Time,Value} 调用服务器并等待 Wait ms maximum 的答案
终于包含了2个测试函数
- test([Type,Time,Value,OptionalWait]) 它会生成一个进程,该进程将发送带有相应参数的类型请求。答案被发送回调用进程。答案可以在 shell.
中用 flush() 检索
- parallel_test ([Type,Time,NbRequests,OptionalWait]) 它用相应的参数调用NbRequests times test。它收集了所有
答案并使用本地函数 collect(NbRequests,Timeout) 打印出来。
下面的代码
-module (server_test).
-behaviour(gen_server).
%% API
-export([start/0,stop/0,blocking/2,blocking_catch/2,non_blocking/3,test/1,parallel_test/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%%%===================================================================
%%% API
%%%===================================================================
start() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
stop() ->
gen_server:cast(?SERVER, stop).
blocking(Time,Value) ->
gen_server:call(?SERVER, {blocking,Time,Value}).
blocking_catch(Time,Value) ->
catch {ok,gen_server:call(?SERVER, {blocking,Time,Value})}.
non_blocking(Time,Value,Wait) ->
ReqId = gen_server:send_request(?SERVER,{non_blocking,Time,Value}),
gen_server:wait_response(ReqId,Wait).
test([Type,Time,Value]) -> test([Type,Time,Value,5000]);
test([Type,Time,Value,Wait]) ->
Start = erlang:monotonic_time(),
From = self(),
F = fun() ->
R = case Type of
non_blocking -> ?MODULE:Type(Time,Value,Wait);
_ -> ?MODULE:Type(Time,Value)
end,
From ! {request,Type,Time,Value,got_answer,R,after_microsec,erlang:monotonic_time() - Start}
end,
spawn(F).
parallel_test([Type,Time,NbRequests]) -> parallel_test([Type,Time,NbRequests,5000]);
parallel_test([Type,Time,NbRequests,Wait]) ->
case Type of
non_blocking -> [server_test:test([Type,Time,X,Wait]) || X <- lists:seq(1,NbRequests)];
_ -> [server_test:test([Type,Time,X]) || X <- lists:seq(1,NbRequests)]
end,
collect_answers(NbRequests,Time + 1000).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
{ok, #{}}.
handle_call({blocking,Time,Value}, _From, State) ->
timer:sleep(Time),
Reply = {ok,Value},
{reply, Reply, State};
handle_call({non_blocking,Time,Value}, From, State) ->
F = fun() ->
do_answer(From,Time,Value)
end,
spawn(F),
{noreply, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(stop, State) ->
{stop,stopped, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(OldVsn, State, _Extra) ->
io:format("changing code replacing version ~p~n",[OldVsn]),
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
do_answer(From,Time,Value) ->
timer:sleep(Time),
gen_server:reply(From, Value).
collect_answers(0,_Timeout) ->
got_all_answers;
collect_answers(NbRequests,Timeout) ->
receive
A -> io:format("~p~n",[A]),
collect_answers(NbRequests - 1, Timeout)
after Timeout ->
missing_answers
end.
shell 中的会话:
44> c(server_test).
{ok,server_test}
45> server_test:start().
{ok,<0.338.0>}
46> server_test:parallel_test([blocking,200,3]).
{request,blocking,200,1,got_answer,{ok,1},after_microsec,207872}
{request,blocking,200,2,got_answer,{ok,2},after_microsec,415743}
{request,blocking,200,3,got_answer,{ok,3},after_microsec,623615}
got_all_answers
47> % 3 blocking requests in parallel, each lasting 200ms, they are executed in sequence but no timemout is reached
47> % All the clients get their answers
47> server_test:parallel_test([blocking,2000,3]).
{request,blocking,2000,1,got_answer,{ok,1},after_microsec,2063358}
{request,blocking,2000,2,got_answer,{ok,2},after_microsec,4127740}
missing_answers
48> % 3 blocking requests in parallel, each lasting 2000ms, they are executed in sequence and the last answer exceeds the gen_server timeout.
48> % The client for this request don't receive answer. The client should also manage its own timeout to handle this case
48> server_test:parallel_test([blocking_catch,2000,3]).
{request,blocking_catch,2000,1,got_answer,{ok,1},after_microsec,2063358}
{request,blocking_catch,2000,2,got_answer,{ok,2},after_microsec,4127740}
{request,blocking_catch,2000,3,got_answer,
{'EXIT',{timeout,{gen_server,call,[server_test,{blocking,2000,3}]}}},
after_microsec,5135355}
got_all_answers
49> % same thing but catching the exception. After 5 seconds the gen_server call throws a timeout exception.
49> % The information can be forwarded to the client
49> server_test:parallel_test([non_blocking,200,3]).
{request,non_blocking,200,1,got_answer,{reply,1},after_microsec,207872}
{request,non_blocking,200,2,got_answer,{reply,2},after_microsec,207872}
{request,non_blocking,200,3,got_answer,{reply,3},after_microsec,207872}
got_all_answers
50> % using non blocking mechanism, we can see that all the requests were managed in parallel
50> server_test:parallel_test([non_blocking,5100,3]).
{request,non_blocking,5100,1,got_answer,timeout,after_microsec,5136379}
{request,non_blocking,5100,2,got_answer,timeout,after_microsec,5136379}
{request,non_blocking,5100,3,got_answer,timeout,after_microsec,5136379}
got_all_answers
51> % if we increase the answer delay above 5000ms, all requests fail in default timeout
51> server_test:parallel_test([non_blocking,5100,3,6000]).
{request,non_blocking,5100,1,got_answer,{reply,1},after_microsec,5231611}
{request,non_blocking,5100,2,got_answer,{reply,2},after_microsec,5231611}
{request,non_blocking,5100,3,got_answer,{reply,3},after_microsec,5231611}
got_all_answers
52> % but thanks to the send_request/wait_response/reply interfaces, the client can adjust the timeout to an accurate value
52> % for each request
请求无法完成的下一个原因是 amqp_channel:call 之一失败。根据你想做什么,有几种
什么都不做、让崩溃、捕获异常或管理所有情况的可能性。下一个提案使用全局捕获
handle_call({get_account,Timeout}, From, State) ->
F = fun() ->
do_get_account(From,State,Timeout)
end,
spawn(F), % delegate the job to another process and free the server
{noreply, State}; % I don't see any change of State in your code, this should be enough
...
do_get_account(From,State,Timeout) ->
% this block of code asserts all positive return values from amqp_channel calls. it will catch any error
% and return it as {error,...}. If everything goes well it return {ok,Answer}
Reply = try
ok = amqp_channel:call(State, #'exchange.declare'{exchange = <<"get">>, type = <<"topic">>}),
ok = amqp_channel:call(State, #'queue.declare'{queue = <<"get_account">>}),
Binding = #'queue.bind'{exchange = <<"get">>,
routing_key = <<"get.account">>,
queue = <<"get_account">>},
#'queue.bind_ok'{} = amqp_channel:call(State, Binding),
ok = amqp_channel:call(State,#'basic.consume'{queue = <<"get_account">>, no_ack = true}),
{ok,wait_account_reply(Timeout)}
catch
Class:Exception -> {error,Class,Exception}
end,
gen_server:reply(From, Reply).
wait_account_reply(Timeout) ->
receive
% #'basic.consume_ok'{} -> ok % you do not handle this message, ignore it since it will be garbaged when the process die
{#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> extract_account(Payload)
after Timeout->
server_timeout
end.
extract_account(Payload)->
{[{<<"account_id">>, AccountId}]} = jiffy:decode(Payload),
Doc = {[{<<"account_id">>, AccountId}]},
getAccDataDb:create_AccountId_view(), % What is the effect of this function, what is the return value?
case getAccDataDb:getAccountNameDetails(Doc) of
success ->
getAccDataDb:getAccountNameDetails1(Doc);
details_not_matched ->
user_not_exist
end.
客户端应如下所示:
get_account() ->
ReqId = gen_server:send_request(server_name,{get_account,2000}),
gen_server:wait_response(ReqId,2200).
有时我的循环 returns 正常,因为超时如何以正确的方式编写此代码。当超时时,它只是 returns 好,但不是我假设的实际值。在句柄调用中,我在 loop() 函数中调用一个函数 loop() 我正在接收一条带有 receive 子句的消息。现在,无论数据是否已成功保存,我都使用 loop2 函数 returns 将此数据发送到我的数据库,并将响应返回给 loop()。但是如果超时我的循环函数 returns 可以但不是实际值。
% @Author: ZEESHAN AHMAD
% @Date: 2020-12-22 05:06:12
% @Last Modified by: ZEESHAN AHMAD
% @Last Modified time: 2021-01-10 04:42:59
-module(getAccDataCons).
-behaviour(gen_server).
-include_lib("deps/amqp_client/include/amqp_client.hrl").
-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3,
terminate/2]).
-export([get_account/0]).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:cast(?MODULE, stop).
get_account() ->
gen_server:call(?MODULE, {get_account}).
init(_Args) ->
{ok, Connection} = amqp_connection:start(#amqp_params_network{host = "localhost"}),
{ok, Channel} = amqp_connection:open_channel(Connection),
{ok, Channel}.
handle_call({get_account}, _From, State) ->
amqp_channel:call(State, #'exchange.declare'{exchange = <<"get">>, type = <<"topic">>}),
amqp_channel:call(State, #'queue.declare'{queue = <<"get_account">>}),
Binding =
#'queue.bind'{exchange = <<"get">>,
routing_key = <<"get.account">>,
queue = <<"get_account">>},
#'queue.bind_ok'{} = amqp_channel:call(State, Binding),
io:format(" [*] Waiting for logs. To exit press CTRL+C~n"),
amqp_channel:call(State,#'basic.consume'{queue = <<"get_account">>, no_ack = true}),
Returned =loop(),
io:format("~nReti=~p",[Returned]),
{reply, Returned, State};
handle_call(Message, _From, State) ->
io:format("received other handle_call message: ~p~n", [Message]),
{reply, ok, State}.
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(Message, State) ->
io:format("received other handle_cast call : ~p~n", [Message]),
{noreply, State}.
handle_info(Message, State) ->
io:format("received handle_info message : ~p~n", [Message]),
{noreply, State}.
code_change(_OldVer, State, _Extra) ->
{ok, State}.
terminate(Reason, _State) ->
io:format("server is terminating with reason :~p~n", [Reason]).
loop()->
receive
#'basic.consume_ok'{} -> ok
end,
receive
{#'basic.deliver'{}, Msg} ->
#amqp_msg{payload = Payload} = Msg,
Value=loop2(Payload),
Value
after 2000->
io:format("Server timeout")
end.
loop2(Payload)->
Result = jiffy:decode(Payload),
{[{<<"account_id">>, AccountId}]} = Result,
Doc = {[{<<"account_id">>, AccountId}]},
getAccDataDb:create_AccountId_view(),
Returned=case getAccDataDb:getAccountNameDetails(Doc) of
success ->
Respo = getAccDataDb:getAccountNameDetails1(Doc),
Respo;
details_not_matched ->
user_not_exist
end,
Returned.
没有loop
和loop2
代码,很难给出答案,如果这两个函数之一检测到超时,你必须先改变它们的行为以避免任何超时,或将其增加到有效的值。如果超时是必要的,那么确保 return 值是明确的湿它发生,例如 {error,RequestRef,timeout}
而不是 ok
.
尽管如此 gen_server
不应该等待太久的答案,你可以修改你的代码做:
而不是在客户端进程中使用 gen_server:call(ServerRef,Request)
,您可以使用:
RequestId = send_request(ServerRef, Request),
Result = wait_response(RequestId, Timeout),
并删除 loop
and/or loop2
中的超时。这样做你可以控制客户端的超时,你甚至可以将它设置为无穷大(不是一个好主意!)。
或者您可以将函数分成两部分
gen_server:cast(ServerRef,{Request,RequestRef}),
% this will not wait for any answer, RequestRef is a tag to identify later
% if the request was fulfilled, you can use make_ref() to generate it
及以后,或在另一个客户端进程中(这需要至少将 RequestRef
传递给此进程)检查请求的结果:
Answer = gen_server:call(ServerRef,{get_answer,RequestRef}),
case Answer of
no_reply -> ... % no answer yet
{ok,Reply} -> ... % handle the answer
end,
最后,您必须修改循环代码以处理 RequestRef
,将消息(再次使用 gen_server:cast
)与结果和 RequestRef
一起发送回服务器,然后存储这导致服务器状态。
我认为第二个解决方案没有价值,因为它与第一个解决方案或多或少相同,但是是手工制作的,它让您可以管理许多可能结束的错误情况(例如客户端死亡)变成一种内存泄漏。
编辑太长了,我把它放在新的答案里了。
超时时收到ok
的原因在loop()
代码中。在第二个接收块中,2000 毫秒后,您 return
在 io:format/1
语句之后。
io:format
returns ok
这就是您在 Returned 变量中得到的。您应该使用
loop()->
ok = receive
#'basic.consume_ok'{} -> ok
end,
receive
{#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> {ok,loop2(Payload)}
after 2000 ->
io:format("Server timeout"),
{error,timeout}
end.
使用此代码,您的客户将收到 {ok,Value}
或 {error,timeout}
,并且能够做出相应的反应。
但是这个版本还有问题:
- 2 秒超时可能太短,您缺少有效答案
- 当您在接收块中使用模式匹配并且不检查每个 amqp_channel:call
的结果时,可能会出现许多不同的问题并显示为超时
首先让我们看一下超时。有可能对 amqp_channel
的 4 次调用总共需要超过 2 秒才能成功完成。简单的解决方案是增加超时,将 after 2000
更改为 after 3000
或更多。
但是你会遇到两个问题:
- 您的 gen_server 一直处于阻塞状态,如果不专用于单个客户端,将无法用于 在等待答案时处理任何其他请求。
- 如果您需要将超时增加到 5 秒以上,您将遇到另一个超时,由 gen_server 内部管理:请求必须在 5 秒内得到答复。
gen_server提供了一些接口函数来解决此类问题:'send_request'、'wait_response'和回复。这是一个基本的 gen_server 可以处理 3 种请求:
- stop ... 停止服务器,对更新代码很有用。
- {blocking,Time,Value} 服务器将在 Time ms 结束期间休眠,然后 return 值。这模拟了你的情况,你可以调整如何 需要很长时间才能得到答案。
- {non_blocking,Time,Value} 服务器会将作业委托给另一个进程并且 return 立即没有应答(因此 它可用于另一个请求)。新进程将在 Time ms end 然后 return Value using gen_server:reply. 期间休眠
服务器模块实现了几个用户界面:
- 标准的start(), stop()
- blocking(Time,Value) 使用 gen_server:call 通过请求 {blocking,Time,Value} 调用服务器
- blocking_catch(Time,Value) 与上一个相同,但捕获 gen_server:call 的结果以显示隐藏的超时
- non_blocking(Time,Value,Wait) 使用 gen_server:send_request 使用请求 {non_blocking,Time,Value} 调用服务器并等待 Wait ms maximum 的答案
终于包含了2个测试函数
- test([Type,Time,Value,OptionalWait]) 它会生成一个进程,该进程将发送带有相应参数的类型请求。答案被发送回调用进程。答案可以在 shell. 中用 flush() 检索
- parallel_test ([Type,Time,NbRequests,OptionalWait]) 它用相应的参数调用NbRequests times test。它收集了所有 答案并使用本地函数 collect(NbRequests,Timeout) 打印出来。
下面的代码
-module (server_test).
-behaviour(gen_server).
%% API
-export([start/0,stop/0,blocking/2,blocking_catch/2,non_blocking/3,test/1,parallel_test/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%%%===================================================================
%%% API
%%%===================================================================
start() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
stop() ->
gen_server:cast(?SERVER, stop).
blocking(Time,Value) ->
gen_server:call(?SERVER, {blocking,Time,Value}).
blocking_catch(Time,Value) ->
catch {ok,gen_server:call(?SERVER, {blocking,Time,Value})}.
non_blocking(Time,Value,Wait) ->
ReqId = gen_server:send_request(?SERVER,{non_blocking,Time,Value}),
gen_server:wait_response(ReqId,Wait).
test([Type,Time,Value]) -> test([Type,Time,Value,5000]);
test([Type,Time,Value,Wait]) ->
Start = erlang:monotonic_time(),
From = self(),
F = fun() ->
R = case Type of
non_blocking -> ?MODULE:Type(Time,Value,Wait);
_ -> ?MODULE:Type(Time,Value)
end,
From ! {request,Type,Time,Value,got_answer,R,after_microsec,erlang:monotonic_time() - Start}
end,
spawn(F).
parallel_test([Type,Time,NbRequests]) -> parallel_test([Type,Time,NbRequests,5000]);
parallel_test([Type,Time,NbRequests,Wait]) ->
case Type of
non_blocking -> [server_test:test([Type,Time,X,Wait]) || X <- lists:seq(1,NbRequests)];
_ -> [server_test:test([Type,Time,X]) || X <- lists:seq(1,NbRequests)]
end,
collect_answers(NbRequests,Time + 1000).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([]) ->
{ok, #{}}.
handle_call({blocking,Time,Value}, _From, State) ->
timer:sleep(Time),
Reply = {ok,Value},
{reply, Reply, State};
handle_call({non_blocking,Time,Value}, From, State) ->
F = fun() ->
do_answer(From,Time,Value)
end,
spawn(F),
{noreply, State};
handle_call(_Request, _From, State) ->
Reply = ok,
{reply, Reply, State}.
handle_cast(stop, State) ->
{stop,stopped, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(OldVsn, State, _Extra) ->
io:format("changing code replacing version ~p~n",[OldVsn]),
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
do_answer(From,Time,Value) ->
timer:sleep(Time),
gen_server:reply(From, Value).
collect_answers(0,_Timeout) ->
got_all_answers;
collect_answers(NbRequests,Timeout) ->
receive
A -> io:format("~p~n",[A]),
collect_answers(NbRequests - 1, Timeout)
after Timeout ->
missing_answers
end.
shell 中的会话:
44> c(server_test).
{ok,server_test}
45> server_test:start().
{ok,<0.338.0>}
46> server_test:parallel_test([blocking,200,3]).
{request,blocking,200,1,got_answer,{ok,1},after_microsec,207872}
{request,blocking,200,2,got_answer,{ok,2},after_microsec,415743}
{request,blocking,200,3,got_answer,{ok,3},after_microsec,623615}
got_all_answers
47> % 3 blocking requests in parallel, each lasting 200ms, they are executed in sequence but no timemout is reached
47> % All the clients get their answers
47> server_test:parallel_test([blocking,2000,3]).
{request,blocking,2000,1,got_answer,{ok,1},after_microsec,2063358}
{request,blocking,2000,2,got_answer,{ok,2},after_microsec,4127740}
missing_answers
48> % 3 blocking requests in parallel, each lasting 2000ms, they are executed in sequence and the last answer exceeds the gen_server timeout.
48> % The client for this request don't receive answer. The client should also manage its own timeout to handle this case
48> server_test:parallel_test([blocking_catch,2000,3]).
{request,blocking_catch,2000,1,got_answer,{ok,1},after_microsec,2063358}
{request,blocking_catch,2000,2,got_answer,{ok,2},after_microsec,4127740}
{request,blocking_catch,2000,3,got_answer,
{'EXIT',{timeout,{gen_server,call,[server_test,{blocking,2000,3}]}}},
after_microsec,5135355}
got_all_answers
49> % same thing but catching the exception. After 5 seconds the gen_server call throws a timeout exception.
49> % The information can be forwarded to the client
49> server_test:parallel_test([non_blocking,200,3]).
{request,non_blocking,200,1,got_answer,{reply,1},after_microsec,207872}
{request,non_blocking,200,2,got_answer,{reply,2},after_microsec,207872}
{request,non_blocking,200,3,got_answer,{reply,3},after_microsec,207872}
got_all_answers
50> % using non blocking mechanism, we can see that all the requests were managed in parallel
50> server_test:parallel_test([non_blocking,5100,3]).
{request,non_blocking,5100,1,got_answer,timeout,after_microsec,5136379}
{request,non_blocking,5100,2,got_answer,timeout,after_microsec,5136379}
{request,non_blocking,5100,3,got_answer,timeout,after_microsec,5136379}
got_all_answers
51> % if we increase the answer delay above 5000ms, all requests fail in default timeout
51> server_test:parallel_test([non_blocking,5100,3,6000]).
{request,non_blocking,5100,1,got_answer,{reply,1},after_microsec,5231611}
{request,non_blocking,5100,2,got_answer,{reply,2},after_microsec,5231611}
{request,non_blocking,5100,3,got_answer,{reply,3},after_microsec,5231611}
got_all_answers
52> % but thanks to the send_request/wait_response/reply interfaces, the client can adjust the timeout to an accurate value
52> % for each request
请求无法完成的下一个原因是 amqp_channel:call 之一失败。根据你想做什么,有几种 什么都不做、让崩溃、捕获异常或管理所有情况的可能性。下一个提案使用全局捕获
handle_call({get_account,Timeout}, From, State) ->
F = fun() ->
do_get_account(From,State,Timeout)
end,
spawn(F), % delegate the job to another process and free the server
{noreply, State}; % I don't see any change of State in your code, this should be enough
...
do_get_account(From,State,Timeout) ->
% this block of code asserts all positive return values from amqp_channel calls. it will catch any error
% and return it as {error,...}. If everything goes well it return {ok,Answer}
Reply = try
ok = amqp_channel:call(State, #'exchange.declare'{exchange = <<"get">>, type = <<"topic">>}),
ok = amqp_channel:call(State, #'queue.declare'{queue = <<"get_account">>}),
Binding = #'queue.bind'{exchange = <<"get">>,
routing_key = <<"get.account">>,
queue = <<"get_account">>},
#'queue.bind_ok'{} = amqp_channel:call(State, Binding),
ok = amqp_channel:call(State,#'basic.consume'{queue = <<"get_account">>, no_ack = true}),
{ok,wait_account_reply(Timeout)}
catch
Class:Exception -> {error,Class,Exception}
end,
gen_server:reply(From, Reply).
wait_account_reply(Timeout) ->
receive
% #'basic.consume_ok'{} -> ok % you do not handle this message, ignore it since it will be garbaged when the process die
{#'basic.deliver'{}, #amqp_msg{payload = Payload}} -> extract_account(Payload)
after Timeout->
server_timeout
end.
extract_account(Payload)->
{[{<<"account_id">>, AccountId}]} = jiffy:decode(Payload),
Doc = {[{<<"account_id">>, AccountId}]},
getAccDataDb:create_AccountId_view(), % What is the effect of this function, what is the return value?
case getAccDataDb:getAccountNameDetails(Doc) of
success ->
getAccDataDb:getAccountNameDetails1(Doc);
details_not_matched ->
user_not_exist
end.
客户端应如下所示:
get_account() ->
ReqId = gen_server:send_request(server_name,{get_account,2000}),
gen_server:wait_response(ReqId,2200).