Actor Systems 如何防止队列内存溢出,同时防止线程在写入队列时阻塞?
How do Actor Systems function to prevent memory overflow from queues but also prevent threads blocking on writing on the queues?
演员们互相发送信息。如果队列有限,那么 write/send 尝试填满队列时会发生什么情况?阻塞还是丢弃?如果它们不受限制,则可能会导致内存崩溃。多少是可配置的?
您可以轻松测试 Erlang VM 在这种情况下的行为。在 shell:
F = fun F() -> receive done -> ok end end,
P = spawn(F),
G = fun G(Pid,Size,Wait) -> Pid ! lists:seq(1,Size), receive done -> ok after Wait -> G(Pid,Size,Wait) end end,
H = fun(Pid,Size,Wait) -> T = fun() -> G(Pid,Size,Wait) end, spawn(T) end,
D = fun D() -> io:format("~p~n~p~n",[erlang:time(),erlang:memory(processes_used)]), receive done -> ok after 10000 -> D() end end,
P1 = spawn(D).
P2 = H(P,100000,5).
您会看到内存分配异常,VM 写入核心转储并崩溃。
我没有查看如何修改限制,如果你尝试一下,你会看到它需要达到非常高的邮件数量,使用邮箱中的数十GB内存。
如果你遇到这种情况,我想第一反应应该不是加大尺寸,你应该先寻找
- 未读消息,
- 进程瓶颈
- 应用架构
- Erlang 是否适合您的问题
- ...
erlang 中的 actor 队列没有限制,这受限于 VM 的内存大小,如果 VM 中的内存大小已满,VM 会崩溃。对于监视器或管理内存分配和 cpu 加载,您可以在 Erlang
中使用 os_mon
你可以在 erlang 中测试 shell
F = fun() -> timer:sleep(60000),
{message_queue_len, InboxLen} = erlang:process_info(self(), message_queue_len),
io:format("Len ===> ~p", [InboxLen])
end.
PID = erlang:spawn(F).
[PID ! "hi" || _ <- lists:seq(1, 50000)].
如果您增加消息的数量,您可能会溢出内存
Akka 中的默认邮箱不受限制,因此不会防止内存崩溃。但是,您可以将参与者配置为使用不同的邮箱,其中既有在达到最大大小时丢弃(传递给死信)消息的邮箱,也有阻塞的邮箱(我不建议使用这些邮箱)。您可以在此处的文档中找到 Akka 附带的所有邮箱实现:https://doc.akka.io/docs/akka/current/typed/mailboxes.html#mailbox-implementations
Akka 中的默认邮箱不受限制。但是如果你想限制邮箱中的最大消息数,你可以在actor中构建一个Akka流,然后可以按需使用OverflowStrategy。
例如:
val source: Source[Message, SourceQueueWithComplete[Message]] =
Source.queue[Message](bufferSize = 8192,
overflowStrategy = OverflowStrategy.dropNew)
演员们互相发送信息。如果队列有限,那么 write/send 尝试填满队列时会发生什么情况?阻塞还是丢弃?如果它们不受限制,则可能会导致内存崩溃。多少是可配置的?
您可以轻松测试 Erlang VM 在这种情况下的行为。在 shell:
F = fun F() -> receive done -> ok end end,
P = spawn(F),
G = fun G(Pid,Size,Wait) -> Pid ! lists:seq(1,Size), receive done -> ok after Wait -> G(Pid,Size,Wait) end end,
H = fun(Pid,Size,Wait) -> T = fun() -> G(Pid,Size,Wait) end, spawn(T) end,
D = fun D() -> io:format("~p~n~p~n",[erlang:time(),erlang:memory(processes_used)]), receive done -> ok after 10000 -> D() end end,
P1 = spawn(D).
P2 = H(P,100000,5).
您会看到内存分配异常,VM 写入核心转储并崩溃。
我没有查看如何修改限制,如果你尝试一下,你会看到它需要达到非常高的邮件数量,使用邮箱中的数十GB内存。
如果你遇到这种情况,我想第一反应应该不是加大尺寸,你应该先寻找
- 未读消息,
- 进程瓶颈
- 应用架构
- Erlang 是否适合您的问题
- ...
erlang 中的 actor 队列没有限制,这受限于 VM 的内存大小,如果 VM 中的内存大小已满,VM 会崩溃。对于监视器或管理内存分配和 cpu 加载,您可以在 Erlang
中使用 os_mon你可以在 erlang 中测试 shell
F = fun() -> timer:sleep(60000),
{message_queue_len, InboxLen} = erlang:process_info(self(), message_queue_len),
io:format("Len ===> ~p", [InboxLen])
end.
PID = erlang:spawn(F).
[PID ! "hi" || _ <- lists:seq(1, 50000)].
如果您增加消息的数量,您可能会溢出内存
Akka 中的默认邮箱不受限制,因此不会防止内存崩溃。但是,您可以将参与者配置为使用不同的邮箱,其中既有在达到最大大小时丢弃(传递给死信)消息的邮箱,也有阻塞的邮箱(我不建议使用这些邮箱)。您可以在此处的文档中找到 Akka 附带的所有邮箱实现:https://doc.akka.io/docs/akka/current/typed/mailboxes.html#mailbox-implementations
Akka 中的默认邮箱不受限制。但是如果你想限制邮箱中的最大消息数,你可以在actor中构建一个Akka流,然后可以按需使用OverflowStrategy。
例如:
val source: Source[Message, SourceQueueWithComplete[Message]] =
Source.queue[Message](bufferSize = 8192,
overflowStrategy = OverflowStrategy.dropNew)