如何在 Erlang/OTP 中构建 MQ 消费者循环?
How to structure MQ consumer loop in Erlang/OTP?
我需要创建使用消息队列并使用 Erlang/OTP 异步处理消息的简单应用程序。考虑 Golang 中的这个伪示例:
var queue chan
func main() {
for req := range queue {
go handleRequest(req) //handle asynchronously
}
}
如何根据 OTP 原则正确构建此结构?
我一直在寻找 gen_server,但在这种情况下,我在哪里定义我的循环回归?
另外,如何启动异步句柄?我应该创建另一个主管并在每条新消息上使用 supervisor:start_child 吗?
标准库中的gen_server模块为您定义了递归循环。您唯一需要做的就是实现回调函数来处理消息。如果消息队列正在向您的 gen_server 进程发送 Erlang 消息,您将执行如下操作:
handle_info({incoming_request, Request}, _From, State) ->
async_handle_request(Request),
{noreply, State}.
为了异步处理请求,async_handle_request
将为每个传入请求启动一个进程。有两种方法可以做到这一点:要么只是生成一个进程,要么在 simple_one_for_one
主管下启动每个进程。差异归结为错误处理和关闭行为。如果处理请求失败怎么办?您是忽略错误,还是让它传播到 gen_server 进程,还是让主管重新启动进程并重试请求?
This question 说明您何时可以使用 simple_one_for_one 主管。如果你只想生成一个进程,它看起来像这样:
async_handle_request(Request) ->
spawn_link(fun() -> handle_request(Request) end).
然后在handle_request
.
中实现了实际的请求处理逻辑
我需要创建使用消息队列并使用 Erlang/OTP 异步处理消息的简单应用程序。考虑 Golang 中的这个伪示例:
var queue chan
func main() {
for req := range queue {
go handleRequest(req) //handle asynchronously
}
}
如何根据 OTP 原则正确构建此结构?
我一直在寻找 gen_server,但在这种情况下,我在哪里定义我的循环回归?
另外,如何启动异步句柄?我应该创建另一个主管并在每条新消息上使用 supervisor:start_child 吗?
标准库中的gen_server模块为您定义了递归循环。您唯一需要做的就是实现回调函数来处理消息。如果消息队列正在向您的 gen_server 进程发送 Erlang 消息,您将执行如下操作:
handle_info({incoming_request, Request}, _From, State) ->
async_handle_request(Request),
{noreply, State}.
为了异步处理请求,async_handle_request
将为每个传入请求启动一个进程。有两种方法可以做到这一点:要么只是生成一个进程,要么在 simple_one_for_one
主管下启动每个进程。差异归结为错误处理和关闭行为。如果处理请求失败怎么办?您是忽略错误,还是让它传播到 gen_server 进程,还是让主管重新启动进程并重试请求?
This question 说明您何时可以使用 simple_one_for_one 主管。如果你只想生成一个进程,它看起来像这样:
async_handle_request(Request) ->
spawn_link(fun() -> handle_request(Request) end).
然后在handle_request
.