Erlang TCP 接受模式

Erlang Tcp Accepter Pattern

考虑以下(基于 LYSE 的 sockserv)

%%% The supervisor in charge of all the socket acceptors.
-module(tcpsocket_sup).
-behaviour(supervisor).

-export([start_link/0, start_socket/0]).
-export([init/1]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
  {ok, Port} = application:get_env(my_app,tcpPort),
  {ok, ListenSocket} = gen_tcp:listen(
    Port,
    [binary, {packet, 0}, {reuseaddr, true}, {active, true} ]),
  lager:info(io_lib:format("Listening for TCP on port ~p", [Port])),
  spawn_link(fun empty_listeners/0),
  {ok, {{simple_one_for_one, 60, 3600},
    [{socket,
      {tcpserver, start_link, [ListenSocket]},
      temporary, 1000, worker, [tcpserver]}
    ]}}.

start_socket() ->
  supervisor:start_child(?MODULE, []).%,

empty_listeners() ->
  [start_socket() || _ <- lists:seq(1,20)],
  ok.

%%%-------------------------------------------------------------------
%%% @author mylesmcdonnell
%%% @copyright (C) 2015, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. Feb 2015 07:49
%%%-------------------------------------------------------------------
-module(tcpserver).
-author("mylesmcdonnell").

-behaviour(gen_server).

-record(state, {
    next,
    socket}).

-export([start_link/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]).

-define(SOCK(Msg), {tcp, _Port, Msg}).
-define(TIME, 800).
-define(EXP, 50).

start_link(Socket) ->
  gen_server:start_link(?MODULE, Socket, []).

init(Socket) ->
  gen_server:cast(self(), accept),
  {ok, #state{socket=Socket}}.

handle_call(_E, _From, State) ->
  {noreply, State}.

handle_cast(accept, S = #state{socket=ListenSocket}) ->
  {ok, AcceptSocket} = gen_tcp:accept(ListenSocket),
  kvstore_tcpsocket_sup:start_socket(),
  receive
    {tcp, Socket, <<"store",Value/binary>>} ->
      Uid = kvstore:store(Value),
      send(Socket,Uid);
    {tcp, Socket, <<"retrieve",Key/binary>>} ->
      case kvstore:retrieve(binary_to_list(Key)) of
        [{_, Value}|_] ->
          send(Socket,Value);
        _ ->
          send(Socket,<<>>)
      end;
    {tcp, Socket, _} ->
      send(Socket, "INVALID_MSG")
  end,
  {noreply, S#state{socket=AcceptSocket, next=name}}.

handle_info(_, S) ->
  {noreply, S}.

code_change(_OldVsn, State, _Extra) ->
  {ok, State}.

terminate(normal, _State) ->
  ok;
terminate(_Reason, _State) ->
  lager:info("terminate reason: ~p~n", [_Reason]).

send(Socket, Bin) ->
  ok = gen_tcp:send(Socket, Bin),
  ok = gen_tcp:close(Socket),
  ok.

我不清楚每个 tcpserver 进程是如何终止的?这是泄漏过程吗?

我没有看到任何你正在终止拥有进程的地方。

我想你要找的是四种情况:

  • 客户端终止连接(你收到tcp_closed
  • 连接不稳定(您收到 tcp_error
  • 服务器收到一条系统消息终止(当然,这可能只是主管杀死它,或者一条终止消息)
  • 客户端发送一条消息,告诉服务器它已完成,您想要做一些清理工作,而不仅仅是对 tcp_closed 作出反应。

最常见的情况通常是客户端只是关闭连接,为此您需要类似的东西:

handle_info({tcp_closed, _}, State) ->
    {stop, normal, State};

连接变得奇怪总是有可能的。我想不出任何时候我想要拥有自己的进程或套接字,所以:

%% You might want to log something here.
handle_info({tcp_error, _}, State) ->
    {stop, normal, State};

在任何情况下,如果客户端告诉服务器它已完成,并且您需要根据客户端已成功完成某些操作进行清理(也许您打开了应该首先写入的资源,或者打开了待处理的数据库事务,或其他)你会希望从客户端收到一条成功消息,该消息会像 send/2 那样关闭连接,并且 returns {stop, normal, State} 会停止该过程。

这里的关键是确保您确定要结束连接的情况,或者终止服务器进程或(更好)return {stop, Reason, State}.

如上所述,如果您打算 send/2 是一个单一的响应和一个干净的退出(或者实际上,每个 accept 转换应该导致一个单一的 send/2 然后终止),那么你想要:

handle_cast(accept, S = #state{socket=ListenSocket}) ->
  {ok, AcceptSocket} = gen_tcp:accept(ListenSocket),
  kvstore_tcpsocket_sup:start_socket(),
  receive
    %% stuff that results in a call to send/2 in any case.
  end,
  {stop, normal, S}.

LYSE 演示的案例是连接持续存在并且客户端和服务器之间持续来回的案例。在上面的例子中,你正在处理一个请求,产生一个新的监听器来重新填充监听器池,并且应该退出,因为你没有计划 gen_server 做任何进一步的工作。