Erlang:从共享队列中推送和拉取
Erlang: push and pull from a shared queue
我需要维护一个共享队列,我可以在其中推送数据,如果队列不为空,一个单独的线程将定期检查并从队列中提取数据。我想出了下面的解决方案,我可以在其中将数据发送到流程并将其加起来到一个列表中。但是,是否有更清洁/更简单的解决方案来做到这一点?
我不确定如何从下面的代码中提取数据。
-module(abc).
-export(queue/0).
queue() ->
receive
{push, Xmpp} ->
io:format("Push"),
queue(Xmpp);
{pull} ->
io:format("pull"),
queue()
end.
queue(E) ->
receive
{push, Xmpp} ->
io:format("Push ~w",[E]),
E1 = lists:append([E],[Xmpp]),
queue(E1);
{reset} ->
queue([])
end.
代码可能不会完全按照您的要求执行。当您从 receive
块(第 7 行中的 queue(Xmpp);
)调用 queue/1
时,queue/1
将触发,然后等待消息。因为这不是在单独的进程中产生的,所以 queue/0
将阻塞(因为 queue/1
现在正在等待一条从未发送过的消息)。
此外,queue/0
永远不会向发送消息的进程发送回任何内容。它无法将数据返回给发送方。
以下将起作用(您需要向 queue/0
返回的 pid 发送消息)。
-module(abc).
-export([queue/0,queue/1]).
queue() ->
%% initialize an empty queue,
%% return the Pid to the caller
spawn(abc,queue,[[]]).
queue(E) when is_list(E) ->
receive
%% append the message value to the existing list
{push, Xmpp} ->
io:format("Pushing ~w to ~w~n",[Xmpp,E]),
E1 = lists:append(E,[Xmpp]),
queue(E1);
%% reset the queue
{reset} ->
queue([]);
%% return the value to the caller
%% "Pid" must be a Pid
{pull, Pid} when is_pid(Pid) ->
io:format("pull~n"),
Pid ! E,
queue(E)
end.
这是一个在 Erlang 中有直接解决方案的问题。大多数时候,您将要编程的每个 erlang 模块都像一个服务器,它会接收消息并响应,您可以有 0 个、1 个或多个服务器 运行 相同的 erlang 模块代码。同时,您将在客户端的同一模块中进行编程,这是一种向服务器发送消息的简单方法,而无需知道服务器期望的所有消息格式,而是使用函数,而不是做类似的事情
Server ! {put, Value},
receive
{Server, {ok, Value}} ->
everything_ok;
{Server, {error, Reason}} ->
handle_error
end,
你结束了像
这样的事情
my_module:put(Server, Value).
因此您可以使用代码在 erlang 中创建一个服务器进程:
-module(abc).
-export([start/0, put/2, pop/1, reset/1, is_empty/1, loop/1]).
%% Client
start() ->
spawn(?MODULE, loop, [[]]).
put(Pid, Item) ->
Pid ! {self(), {put, Item}},
receive
{Pid, {ok, Item}} ->
Item;
{Pid, {error, Reason}} ->
{error, Reason}
after 500 ->
timeout
end.
pop(Pid) ->
Pid ! {self(), {pop}},
receive
{Pid, {ok, Item}} ->
Item;
{Pid, {error, Reason}} ->
{error, Reason}
after 500 ->
timeout
end.
reset(Pid) ->
Pid ! {self(), {reset}},
receive
{Pid, {ok}} ->
ok;
_ ->
error
after 500 ->
timeout
end.
is_empty(Pid) ->
Pid ! {self(), {is_empty}},
receive
{Pid, {true}} ->
true;
{Pid, {false}} ->
false;
_ ->
error
after 500 ->
timeout
end.
%% Server
loop(Queue) ->
receive
{From, {put, Item}} when is_pid(From) ->
From ! {self(), {ok, Item}},
loop(Queue ++ [Item]);
{From, {pop}} when is_pid(From) ->
case Queue of
[] ->
From ! {self(), {error, empty}},
loop(Queue);
[H|T] ->
From ! {self(), {ok, H}},
loop(T)
end;
{From, {reset}} when is_pid(From) ->
From ! {self(), {ok}},
loop([]);
{From, {is_empty}} when is_pid(From) ->
case Queue of
[] ->
From ! {self(), {true}},
loop(Queue);
_ ->
From ! {self(), {false}},
loop(Queue)
end;
_ ->
loop(Queue)
end.
您最终使用的代码也很简单:
(emacs@rorra)1> c("/Users/rorra/abc", [{outdir, "/Users/rorra/"}]).
{ok,abc}
(emacs@rorra)2> Q = abc:start().
<0.44.0>
(emacs@rorra)3> abc:is_empty(Q).
true
(emacs@rorra)4> abc:pop(Q).
{error,empty}
(emacs@rorra)5> abc:put(Q, 23).
23
(emacs@rorra)6> abc:is_empty(Q).
false
(emacs@rorra)7> abc:pop(Q).
23
(emacs@rorra)8> abc:pop(Q).
{error,empty}
(emacs@rorra)9> abc:put(Q, 23).
23
(emacs@rorra)10> abc:put(Q, 50).
50
(emacs@rorra)11> abc:reset(Q).
ok
(emacs@rorra)12> abc:is_empty(Q).
true
最后,为了避免所有重复代码,您结束了使用 OTP 并为其编写 gen_server。
我假设你正在自己构建一个队列来学习,否则 Erlang 已经有一个很好的队列实现:
http://www.erlang.org/doc/man/queue.html
以及源代码:
https://github.com/erlang/otp/blob/master/lib/stdlib/src/queue.erl
我需要维护一个共享队列,我可以在其中推送数据,如果队列不为空,一个单独的线程将定期检查并从队列中提取数据。我想出了下面的解决方案,我可以在其中将数据发送到流程并将其加起来到一个列表中。但是,是否有更清洁/更简单的解决方案来做到这一点?
我不确定如何从下面的代码中提取数据。
-module(abc).
-export(queue/0).
queue() ->
receive
{push, Xmpp} ->
io:format("Push"),
queue(Xmpp);
{pull} ->
io:format("pull"),
queue()
end.
queue(E) ->
receive
{push, Xmpp} ->
io:format("Push ~w",[E]),
E1 = lists:append([E],[Xmpp]),
queue(E1);
{reset} ->
queue([])
end.
代码可能不会完全按照您的要求执行。当您从 receive
块(第 7 行中的 queue(Xmpp);
)调用 queue/1
时,queue/1
将触发,然后等待消息。因为这不是在单独的进程中产生的,所以 queue/0
将阻塞(因为 queue/1
现在正在等待一条从未发送过的消息)。
此外,queue/0
永远不会向发送消息的进程发送回任何内容。它无法将数据返回给发送方。
以下将起作用(您需要向 queue/0
返回的 pid 发送消息)。
-module(abc).
-export([queue/0,queue/1]).
queue() ->
%% initialize an empty queue,
%% return the Pid to the caller
spawn(abc,queue,[[]]).
queue(E) when is_list(E) ->
receive
%% append the message value to the existing list
{push, Xmpp} ->
io:format("Pushing ~w to ~w~n",[Xmpp,E]),
E1 = lists:append(E,[Xmpp]),
queue(E1);
%% reset the queue
{reset} ->
queue([]);
%% return the value to the caller
%% "Pid" must be a Pid
{pull, Pid} when is_pid(Pid) ->
io:format("pull~n"),
Pid ! E,
queue(E)
end.
这是一个在 Erlang 中有直接解决方案的问题。大多数时候,您将要编程的每个 erlang 模块都像一个服务器,它会接收消息并响应,您可以有 0 个、1 个或多个服务器 运行 相同的 erlang 模块代码。同时,您将在客户端的同一模块中进行编程,这是一种向服务器发送消息的简单方法,而无需知道服务器期望的所有消息格式,而是使用函数,而不是做类似的事情
Server ! {put, Value},
receive
{Server, {ok, Value}} ->
everything_ok;
{Server, {error, Reason}} ->
handle_error
end,
你结束了像
这样的事情my_module:put(Server, Value).
因此您可以使用代码在 erlang 中创建一个服务器进程:
-module(abc).
-export([start/0, put/2, pop/1, reset/1, is_empty/1, loop/1]).
%% Client
start() ->
spawn(?MODULE, loop, [[]]).
put(Pid, Item) ->
Pid ! {self(), {put, Item}},
receive
{Pid, {ok, Item}} ->
Item;
{Pid, {error, Reason}} ->
{error, Reason}
after 500 ->
timeout
end.
pop(Pid) ->
Pid ! {self(), {pop}},
receive
{Pid, {ok, Item}} ->
Item;
{Pid, {error, Reason}} ->
{error, Reason}
after 500 ->
timeout
end.
reset(Pid) ->
Pid ! {self(), {reset}},
receive
{Pid, {ok}} ->
ok;
_ ->
error
after 500 ->
timeout
end.
is_empty(Pid) ->
Pid ! {self(), {is_empty}},
receive
{Pid, {true}} ->
true;
{Pid, {false}} ->
false;
_ ->
error
after 500 ->
timeout
end.
%% Server
loop(Queue) ->
receive
{From, {put, Item}} when is_pid(From) ->
From ! {self(), {ok, Item}},
loop(Queue ++ [Item]);
{From, {pop}} when is_pid(From) ->
case Queue of
[] ->
From ! {self(), {error, empty}},
loop(Queue);
[H|T] ->
From ! {self(), {ok, H}},
loop(T)
end;
{From, {reset}} when is_pid(From) ->
From ! {self(), {ok}},
loop([]);
{From, {is_empty}} when is_pid(From) ->
case Queue of
[] ->
From ! {self(), {true}},
loop(Queue);
_ ->
From ! {self(), {false}},
loop(Queue)
end;
_ ->
loop(Queue)
end.
您最终使用的代码也很简单:
(emacs@rorra)1> c("/Users/rorra/abc", [{outdir, "/Users/rorra/"}]).
{ok,abc}
(emacs@rorra)2> Q = abc:start().
<0.44.0>
(emacs@rorra)3> abc:is_empty(Q).
true
(emacs@rorra)4> abc:pop(Q).
{error,empty}
(emacs@rorra)5> abc:put(Q, 23).
23
(emacs@rorra)6> abc:is_empty(Q).
false
(emacs@rorra)7> abc:pop(Q).
23
(emacs@rorra)8> abc:pop(Q).
{error,empty}
(emacs@rorra)9> abc:put(Q, 23).
23
(emacs@rorra)10> abc:put(Q, 50).
50
(emacs@rorra)11> abc:reset(Q).
ok
(emacs@rorra)12> abc:is_empty(Q).
true
最后,为了避免所有重复代码,您结束了使用 OTP 并为其编写 gen_server。
我假设你正在自己构建一个队列来学习,否则 Erlang 已经有一个很好的队列实现:
http://www.erlang.org/doc/man/queue.html
以及源代码:
https://github.com/erlang/otp/blob/master/lib/stdlib/src/queue.erl