如何在 Erlang 中进行并行调用并等待所有结果?

How to make parallel calls in Erlang and wait for all of the results?

我正在使用 Erlang 开发手机游戏后端。对于每个 HTTP 请求,它可能需要查询不同的数据源,例如 PostgreSQL、MongoDB 和 Redis。我想并行地独立调用这些数据源,但找不到清晰的 Erlang 方法来做到这一点。

例如,

handle_request(?POST, <<"login">>, UserId, Token) ->
    % Verify token from PostgreSQL
    AuthResult = auth_service:login(UserId, Token), 

    % Get user data such as outfits and level from MongoDB
    UserDataResult = userdata_service:get(UserId),

    % Get messages sent to the user from Redis
    MessageResult = message_service:get(UserId),

    % How to run the above 3 calls in parallel?
    % Then wait for all their results here? 

    % Combine the result and send back to client
    build_response(AuthResult, UserDataResult, MessageResult).

每个服务最终都会调用相应的数据驱动程序(epgsql、eredis、mongo_erlang),最终会出现一些 pooboy:transaction 和 gen_server:call。如何设计这些服务模块也尚未确定。

我想确保上面的 3 个数据调用可以 运行 并行,然后 handle_request 函数等待所有这 3 个调用完成,然后调用 build_response.我怎样才能正确地做到这一点?

作为参考,在 NodeJS 中,我可能会这样做

var authPromise = AuthService.login(user_id, token);
var userDataPromise = UserdataService.get(user_id);
var messagePromise = MessageService.get(user_id);
Promise.all(authPromise, userDataPromise, messagePromise).then( function(values) { 
    buildResponse(values); 
}

在 Scala 中我可能会这样做

val authFuture = AuthService.login(userId, token)
val userDataFuture = UserdataService.get(userId)
val messageFuture = MessageService.get(userId)
for {
    auth <- authFuture
    userData <- userDataFuture
    message <- messageFuture
} yield ( buildResponse(auth, userData, message )

显然,我认为这个问题是 promise/future/yield 问题。但有人告诉我,如果我在 Erlang 中寻找 Promise,我可能会走错方向。实现这一目标的 Erlang 最佳实践是什么?

How to make parallel calls in Erlang and wait for all of the results?

您可以使用堆叠的 receive 子句。 Erlang 将在接收子句中永远等待,直到消息从进程到达(或者您可以使用 after 指定超时)——这类似于 nodejs 中的 waiting a promise :

-module(my).
-compile(export_all).

all_results() -> 
    Pid1 = spawn(?MODULE, getdata1, [self(), {10, 20}]),
    Pid2 = spawn(?MODULE, getdata2, [self(), 30]),
    Pid3 = spawn(?MODULE, getdata3, [self()]),

    [receive {Pid1, Result1} -> Result1 end, 
     receive {Pid2, Result2} -> Result2 end,
     receive {Pid3, Result3} -> Result3 end].

getdata1(From, {X, Y}) -> 
    %% mimic the time it takes to retrieve the data:
    SleepTime = rand:uniform(100),
    io:format("Sleeping for ~w milliseconds~n", [SleepTime]), 
    timer:sleep(SleepTime),

    From ! {self(), X+Y}.  %% send the data back to the main process

getdata2(From, Z) ->
    SleepTime = rand:uniform(100),
    io:format("Sleeping for ~w milliseconds~n", [SleepTime]),
    timer:sleep(SleepTime),

    From ! {self(), Z+1}.

getdata3(From) ->
    SleepTime = rand:uniform(100),
    io:format("Sleeping for ~w milliseconds~n", [SleepTime]),
    timer:sleep(SleepTime),

    From ! {self(), 16}. 

注意这段代码:

[receive {Pid1, Result1} -> Result1 end, 
 receive {Pid2, Result2} -> Result2 end,
 receive {Pid3, Result3} -> Result3 end].

相当于:

R1 = receive {Pid1, Result1} -> 
         Result1 
     end,
R2 = receive {Pid2, Result2} -> 
         Result2 
     end,
R3 = receive {Pid3, Result3} -> 
         Result3 
     end,

[R1, R2, R3].

在shell:

~/erlang_programs$ erl
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Eshell V9.3  (abort with ^G)

1> c(my).                        
my.erl:2: Warning: export_all flag enabled - all functions will be exported
{ok,my}

2> timer:tc(my, all_results, []).
Sleeping for 66 milliseconds
Sleeping for 16 milliseconds
Sleeping for 93 milliseconds
{96356,[30,31,16]}

3> timer:tc(my, all_results, []).
Sleeping for 57 milliseconds
Sleeping for 30 milliseconds
Sleeping for 99 milliseconds
{100153,[30,31,16]}

4> timer:tc(my, all_results, []).
Sleeping for 66 milliseconds
Sleeping for 31 milliseconds
Sleeping for 24 milliseconds
{66426,[30,31,16]}

timer:tc() returns 函数执行所花费的时间(以微秒为单位)(1,000 微秒 = 1 毫秒)以及函数的 return 值。例如,第一次调用 all_results() 需要 96.4 毫秒才能完成,而如果按顺序执行,单个进程将花费 66+16+93=175+ 毫秒才能完成。