Erlang 服务器与端口连接以向 Java 应用程序发送和接收 Json 文件

Erlang server connecting with ports to send and receive a Json file to a Java application

我尝试用 Erlang 为我的 Java 应用程序实现一个服务器。 似乎我的服务器正在运行,但仍然充满错误和死点。 我需要接收一个由 Java 应用程序解析成地图的 JSON 文件,并将其发送回所有客户端,包括上传该文件的客户端。 同时,我需要跟踪谁发出了请求以及发送了消息的哪一部分,以防出现任何问题,客户端应该从这里重新启动,而不是从头开始。除非客户端离开应用程序,否则它应该重新启动。

我的三段代码如下:

app.erl

-module(erlServer_app).

-behaviour(application).

%% Application callbacks
-export([start/2, stop/1]).

%%%===================================================================
%%% Application callbacks
%%%===================================================================

start(_StartType, _StartArgs) ->
    erlServer_sup:start_link().


stop(_State) ->
    ok.

supervisor.erl:

-module(erlServer_sup).

-behaviour(supervisor).

%% API
-export([start_link/0]).

%% Supervisor callbacks
-export([init/1, start_socket/0, terminate_socket/0, empty_listeners/0]).

-define(SERVER, ?MODULE).

%%--------------------------------------------------------------------
%% @doc
%% Starts the supervisor
%%
%% @end
%%--------------------------------------------------------------------
start_link() ->
  supervisor:start_link({local, ?SERVER}, ?MODULE, []).

%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================

init([]) -> % restart strategy 'one_for_one': if one goes down only that one is restarted
  io:format("starting...~n"),
  spawn_link(fun() -> empty_listeners() end),
  {ok,
    {{one_for_one, 5, 30}, % The flag - 5 restart within 30 seconds
      [{erlServer_server, {erlServer_server, init, []}, permanent, 1000, worker, [erlServer_server]}]}}.


%%%===================================================================
%%% Internal functions
%%%===================================================================

start_socket() ->
  supervisor:start_child(?MODULE, []).

terminate_socket() ->
  supervisor:delete_child(?MODULE, []).

empty_listeners() ->
  [start_socket() || _ <- lists:seq(1,20)],
  ok.

server.erl:(我有很多调试io:format。)

-module(erlServer_server).

%% API
-export([init/0, start_server/0]).

%% Defining the port used.
-define(PORT, 8080).

%%%===================================================================
%%% API
%%%===================================================================

init() ->
  start_server().

%%%===================================================================
%%% Server callbacks
%%%===================================================================

start_server() ->
  io:format("Server started.~n"),
  Pid = spawn_link(fun() ->
    {ok, ServerSocket} = gen_tcp:listen(?PORT, [binary, {packet, 0}, 
      {reuseaddr, true}, {active, true}]),
    io:format("Baba~p", [ServerSocket]),
    server_loop(ServerSocket) end),
  {ok, Pid}.

server_loop(ServerSocket) ->
  io:format("Oba~p", [ServerSocket]),
  {ok, Socket} = gen_tcp:accept(ServerSocket),
  Pid1 = spawn(fun() -> client() end),
  inet:setopts(Socket, [{packet, 0}, binary, 
    {nodelay, true}, {active, true}]),
  gen_tcp:controlling_process(Socket, Pid1), 
  server_loop(ServerSocket).

%%%===================================================================
%%% Internal functions
%%%===================================================================

client() ->
  io:format("Starting client. Enter \'quit\' to exit.~n"),
  Client = self(),
  {ok, Sock} = gen_tcp:connect("localhost", ?PORT, [{active, false}, {packet, 2}]),
  display_prompt(Client),
  client_loop(Client, Sock).

%%%===================================================================
%%% Client callbacks
%%%===================================================================

send(Sock, Packet) ->
  {ok, Sock, Packet} = gen_tcp:send(Sock, Packet),
  io:format("Sent ~n").

recv(Packet) ->
  {recv, ok, Packet} = gen_tcp:recv(Packet),
  io:format("Received ~n").

display_prompt(Client) ->
  spawn(fun () ->
    Packet = io:get_line("> "),
    Client ! {entered, Packet}
        end),
  Client ! {requested},
  ok.


client_loop(Client, Sock) ->
  receive
    {entered, "quit\n"} ->
      gen_tcp:close(Sock);
    {entered, Packet}        ->
      % When a packet is entered we receive it,
      recv(Packet),
      display_prompt(Client),
      client_loop(Client, Sock);
    {requested, Packet}        ->
      % When a packet is requested we send it,
      send(Sock, Packet),
      display_prompt(Client),
      client_loop(Client, Sock);
    {error, timeout} ->
      io:format("Send timeout, closing!~n", []),
      Client ! {self(),{error_sending, timeout}},
      gen_tcp:close(Sock);
    {error, OtherSendError} ->
      io:format("Some other error on socket (~p), closing", [OtherSendError]),
      Client ! {self(),{error_sending, OtherSendError}},
      gen_tcp:close(Sock)
  end.

这是我做的第一台服务器,我可能在中间迷路了。当我 运行 它似乎在工作,但挂起。有人能帮我吗?我的本地主机从不加载它永远加载的任何内容。

我的 java 应用如何从同一端口接收它?

我必须使用 Erlang,并且我必须使用端口连接到 java 应用程序。

谢谢你的帮助!

让我们稍微修改一下...

首先:命名

我们在 Erlang 中不使用驼峰命名法。这是令人困惑的,因为大写的变量名和 lower-case(或 single-quoted)原子 mean 不同的东西。此外,模块名称 必须 与导致 case-insensitive 文件系统出现问题的文件名相同。

另外,我们真的想要一个比"server"更好的名字。服务器在这样的系统中可能意味着很多事情,虽然整个系统可能是用 Erlang 编写的服务,但这并不一定意味着我们可以在 "server" 中调用所有内容而不会变得非常模糊!这令人困惑。我暂时将您的项目命名为 "ES"。所以你会有 es_appes_sup 等等。当我们想开始定义新模块时,这会派上用场,也许其中一些称为 "server" 而不必到处写 "server_server"。

第二个:输入数据

一般来说,我们希望将参数传递给函数,而不是将文字(或更糟糕的是,宏重写)埋入代码内部。如果我们要有幻数和常量,让我们尽力将它们放入配置文件中,这样我们就可以access them in a programmatic way,或者甚至更好,让我们在初始启动调用作为从属进程的参数,这样我们就可以通过修改主应用程序模块中的启动调用函数来修改系统的行为(一旦编写)。

-module(es).
-behaviour(application).

-export([listen/1, ignore/0]).
-export([start/0, start/1]).
-export([start/2, stop/1]).

listen(PortNum) ->
    es_client_man:listen(PortNum).

ignore() ->
    es_client_man:ignore().

start() ->
    ok = application:ensure_started(sasl),
    ok = application:start(?MODULE),
    io:format("Starting es...").


start(Port) ->
    ok = start(),
    ok = es_client_man:listen(Port),
    io:format("Startup complete, listening on ~w~n", [Port]).

start(normal, _Args) ->
    es_sup:start_link().

stop(_State) ->
    ok.

我在上面添加了一个 start/1 函数以及一个 start/0、一个 listen/1 和一个 ignore/0,稍后您将在 es_client_man.这些主要是围绕您可以更明确地调用的东西的便利包装,但可能不想一直输入。

此应用程序模块通过让应用程序主管为我们启动项目(通过调用 application:start/1)和 然后 下一行调用 erl_server_server 告诉它开始收听。在早期开发中,我发现这种方法比将自动启动埋在每个地方的每个组件更有用,后来它为我们提供了一种 非常 编写外部接口的简单方法,可以将各种组件打开和关闭。

啊,还有...我们将把它作为 for-real Erlang 应用程序启动,所以我们需要 ebin/ 中的应用程序文件(或者如果您使用 erlang.mk 或类似 src/ 中的 app.src 文件):

ebin/es.app 看起来像这样:

{application,es,
             [{description,"An Erlang Server example project"},
              {vsn,"0.1.0"},
              {applications,[stdlib,kernel,sasl]},
              {modules,[es,
                        es_sup,
                          es_clients,
                            es_client_sup,
                              es_client,
                            es_client_man]},
              {mod,{es,[]}}]}.

modules 下的列表实际上反映了监督树的布局,正如您将在下面看到的那样。

上面的 start/2 函数现在断言我们将仅以 normal 模式启动(这可能合适也可能不合适),并忽略启动参数(也可能合适也可能不合适).

第三名:The Supervision Tree

-module(es_sup).
-behaviour(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, []).

init([]) ->
    RestartStrategy = {one_for_one, 1, 60},
    Clients   = {es_clients,
                 {es_clients, start_link, []},
                 permanent,
                 5000,
                 supervisor,
                 [es_clients]},
    Children  = [Clients],
    {ok, {RestartStrategy, Children}}.

然后...

-module(es_clients).
-behavior(supervisor).

-export([start_link/0]).
-export([init/1]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, none).

init(none) ->
    RestartStrategy = {rest_for_one, 1, 60},
    ClientSup = {es_client_sup,
                 {es_client_sup, start_link, []},
                 permanent,
                 5000,
                 supervisor,
                 [es_client_sup]},
    ClientMan = {es_client_man,
                 {es_client_man, start_link, []},
                 permanent,
                 5000,
                 worker,
                 [es_client_man]},
    Children  = [ClientSup, ClientMan],
    {ok, {RestartStrategy, Children}}.

哇! 这是怎么回事?!?好吧,es_sup 是 supervisor,而不是 one-off 东西产生其他 one-off 个东西。 (误解 supervisors 是你核心问题的一部分。)

主管很无聊。 监督者应该很无聊。他们作为代码reader真正做的就是监督树结构在里面。他们在OTP结构方面为我们做的事情是极其重要的,但是他们并不要求我们写任何程序代码,只是声明它应该有什么children。我们在这里实现的是 service -> worker 结构。因此,我们有 top-level 负责您整个应用程序的主管 es_sup。在其下方,我们(目前)有一个名为 es_clients.

的服务组件

es_clients进程也是主管。这样做的原因是为客户端连接部分定义一个明确的方法,以不影响以后系统其余部分可能存在的任何正在进行的状态。 只是 接受来自客户端的连接是没有用的——肯定有一些状态在其他地方很重要,比如 long-running 连接到某个 Java 节点或其他什么。那将是一个 单独的 服务组件,可能称为 es_java_nodes 并且程序的那部分将以其自己的独立主管开始。这就是为什么它被称为 "supervision tree" 而不是 "supervision list".

所以回到客户...我们将有客户连接。这就是我们称它们为 "clients" 的原因,因为 从这个 Erlang 系统的角度来看 连接的东西是客户端,接受这些连接的进程 抽象了clients 所以我们可以将每个连接处理程序视为客户端本身——因为这正是它所代表的。如果我们稍后连接到上游服务,我们会希望调用 those 无论它们是什么抽象,以便我们在系统中的语义是理智的。

然后你可以用 "an es_client sent a message to an es_java_node to query [thingy]" 来思考,而不是像 "a server_server asked a java_server_client to server_server the service_server" 那样试图保持直截了当(这实际上是愚蠢的事情如果您不按照 inner-system 的观点来保持您的命名原则。

哇哇哇哇...

所以,这里是 es_client_sup:

-module(es_client_sup).
-behaviour(supervisor).

-export([start_acceptor/1]).
-export([start_link/0]).
-export([init/1]).

start_acceptor(ListenSocket) ->
    supervisor:start_child(?MODULE, [ListenSocket]).

start_link() ->
  supervisor:start_link({local, ?MODULE}, ?MODULE, none).

init(none) ->
    RestartStrategy = {simple_one_for_one, 1, 60},
    Client    = {es_client,
                 {es_client, start_link, []},
                 temporary,
                 brutal_kill,
                 worker,
                 [es_client]},
    {ok, {RestartStrategy, [Client]}}.

你在挑选图案吗?当我说 "supervisors should be boring..." 时,我不是在开玩笑 :-) 请注意,这里我们实际上是在 中传递一个参数,我们已经定义了一个接口函数。如果我们需要一个套接字接受器来启动,那么我们就有一个合乎逻辑的地方可以调用。

第四:客户端服务本身

再看看客户经理:

-module(es_client_man).
-behavior(gen_server).

-export([listen/1, ignore/0]).
-export([start_link/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
         code_change/3, terminate/2]).

-record(s, {port_num = none :: none | inet:port_number(),
            listener = none :: none | gen_tcp:socket()}).

listen(PortNum) ->
    gen_server:call(?MODULE, {listen, PortNum}).

ignore() ->
    gen_server:cast(?MODULE, ignore).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, none, []).

init(none) ->
    ok = io:format("Starting.~n"),
    State = #s{},
    {ok, State}.

handle_call({listen, PortNum}, _, State) ->
    {Response, NewState} = do_listen(PortNum, State),
    {reply, Response, NewState};
handle_call(Unexpected, From, State) ->
    ok = io:format("~p Unexpected call from ~tp: ~tp~n", [self(), From, Unexpected]),
    {noreply, State}.

handle_cast(ignore, State) ->
    NewState = do_ignore(State),
    {noreply, NewState};
handle_cast(Unexpected, State) ->
    ok = io:format("~p Unexpected cast: ~tp~n", [self(), Unexpected]),
    {noreply, State}.

handle_info(Unexpected, State) ->
    ok = io:format("~p Unexpected info: ~tp~n", [self(), Unexpected]),
    {noreply, State}.

code_change(_, State, _) ->
    {ok, State}.

terminate(_, _) ->
    ok.

do_listen(PortNum, State = #s{port_num = none}) ->
    SocketOptions =
        [{active,    once},
         {mode,      binary},
         {keepalive, true},
         {reuseaddr, true}],
    {ok, Listener} = gen_tcp:listen(PortNum, SocketOptions),
    {ok, _} = es_client:start(Listener),
    {ok, State#s{port_num = PortNum, listener = Listener}};
do_listen(_, State = #s{port_num = PortNum}) ->
    ok = io:format("~p Already listening on ~p~n", [self(), PortNum]),
    {{error, {listening, PortNum}}, State}.

do_ignore(State = #s{listener = none}) ->
    State;
do_ignore(State = #s{listener = Listener}) ->
    ok = gen_tcp:close(Listener),
    State#s{listener = none}.

嗯,这是怎么回事?这里的基本思想是,我们有一个服务主管来管理整个客户端概念(es_clients,如上所述),然后我们有 simple_one_for_one 来处理刚好处于活动状态的任何客户端( es_client_sup),这里我们有子系统的管理接口。这个管理器所做的就是跟踪我们正在侦听的端口,并且 拥有我们正在侦听的套接字 (如果此时某个端口处于打开状态)。请注意,这可以很容易地重写,以允许同时侦听任意数量的端口,或跟踪所有活着的客户端,或其他任何东西。您想做的事情真的没有限制。

那么我们如何启动可以接受连接的客户端呢?通过告诉他们生成并监听我们作为参数传入的侦听套接字。再看看上面的es_client_sup。我们传入一个空列表作为它的第一个参数。当我们调用它的 start_link 函数时会发生什么,无论我们作为列表传入的 else 都会被添加到整个参数列表中。在这种情况下,我们将传入监听套接字,因此它将以参数 [ListenSocket].

启动

每当客户端侦听器接受连接时,其下一步将是生成其后继者,将原始ListenSocket参数交给它。啊,生命的奇迹。

-module(es_client).

-export([start/1]).
-export([start_link/1, init/2]).
-export([system_continue/3, system_terminate/4,
         system_get_state/1, system_replace_state/2]).

-record(s, {socket = none :: none | gen_tcp:socket()}).

start(ListenSocket) ->
    es_client_sup:start_acceptor(ListenSocket).

start_link(ListenSocket) ->
    proc_lib:start_link(?MODULE, init, [self(), ListenSocket]).

init(Parent, ListenSocket) ->
    ok = io:format("~p Listening.~n", [self()]),
    Debug = sys:debug_options([]),
    ok = proc_lib:init_ack(Parent, {ok, self()}),
    listen(Parent, Debug, ListenSocket).

listen(Parent, Debug, ListenSocket) ->
    case gen_tcp:accept(ListenSocket) of
        {ok, Socket} ->
            {ok, _} = start(ListenSocket),
            {ok, Peer} = inet:peername(Socket),
            ok = io:format("~p Connection accepted from: ~p~n", [self(), Peer]),
            State = #s{socket = Socket},
            loop(Parent, Debug, State);
        {error, closed} ->
            ok = io:format("~p Retiring: Listen socket closed.~n", [self()]),
            exit(normal)
     end.

loop(Parent, Debug, State = #s{socket = Socket}) ->
    ok = inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, <<"bye\r\n">>} ->
            ok = io:format("~p Client saying goodbye. Bye!~n", [self()]),
            ok = gen_tcp:send(Socket, "Bye!\r\n"),
            ok = gen_tcp:shutdown(Socket, read_write),
            exit(normal);
        {tcp, Socket, Message} ->
            ok = io:format("~p received: ~tp~n", [self(), Message]),
            ok = gen_tcp:send(Socket, ["You sent: ", Message]),
            loop(Parent, Debug, State);
        {tcp_closed, Socket} ->
            ok = io:format("~p Socket closed, retiring.~n", [self()]),
            exit(normal);
        {system, From, Request} ->
            sys:handle_system_msg(Request, From, Parent, ?MODULE, Debug, State);
        Unexpected ->
            ok = io:format("~p Unexpected message: ~tp", [self(), Unexpected]),
            loop(Parent, Debug, State)
    end.

system_continue(Parent, Debug, State) ->
    loop(Parent, Debug, State).

system_terminate(Reason, _Parent, _Debug, _State) ->
    exit(Reason).

system_get_state(Misc) -> {ok, Misc}.

system_replace_state(StateFun, Misc) ->
    {ok, StateFun(Misc), Misc}.

请注意,上面我们编写了一个 纯 Erlang 进程,它以 gen_server 的方式与 OTP 集成,但有一个更直接的循环,只处理插座。这意味着我们没有 gen_server call/cast 机制(并且可能需要自己实现这些机制,但通常 asynch-only 是套接字处理的更好方法)。这是专门为 bootstrap OTP-compliant 任意类型的进程设计的 started through the proc_lib module

如果你打算使用主管,那么你真的很想一路走下去并正确使用 OTP

所以我们现在上面的是非常基本的 Telnet 回显服务。与其在服务器模块中编写一个神奇的客户端进程来打结你的大脑(Erlanger 不喜欢他们的大脑打结),你可以启动它,告诉它监听某个端口,然后自己 telnet 到它并查看结果。

我添加了一些脚本来自动启动东西,但基本上取决于 codemake 模块。您的项目布局如下

es/
   Emakefile
   ebin/es.app
   src/*.erl

Emakefile 的内容将使我们的事情变得更容易。在这种情况下,它只有一行:

enter code here{"src/*", [debug_info, {i, "include/"}, {outdir, "ebin/"}]}.

在主 es/ 目录中,如果我们输入一个 erl shell 我们现在可以做...

1> code:add_patha("ebin").
true
2> make:all().
up_to_date
3> es:start().

你会看到一堆 SASL 启动报告在屏幕上滚动。

从那里让我们做 es:listen(5555):

4> es:listen(5555).
<0.94.0> Listening.
ok

酷!所以看起来一切正常。让我们尝试远程登录到我们自己:

ceverett@changa:~/vcs/es$ telnet localhost 5555
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello es thingy
You sent: Hello es thingy
Yay! It works!
You sent: Yay! It works!
bye
Bye!
Connection closed by foreign host.

另一边是什么样子的?

<0.96.0> Listening.
<0.94.0> Connection accepted from: {{127,0,0,1},60775}
<0.94.0> received: <<"Hello es thingy\r\n">>
<0.94.0> received: <<"Yay! It works!\r\n">>
<0.94.0> Client saying goodbye. Bye!

啊,我们在这里看到来自 下一个 侦听器 <0.96.0> 的 "Listening." 消息,它是由第一个 <0.94.0> 生成的。

并发连接数如何?

<0.97.0> Listening.
<0.96.0> Connection accepted from: {{127,0,0,1},60779}
<0.98.0> Listening.
<0.97.0> Connection accepted from: {{127,0,0,1},60780}
<0.97.0> received: <<"Screen 1\r\n">>
<0.96.0> received: <<"Screen 2\r\n">>
<0.97.0> received: <<"I wonder if I could make screen 1 talk to screen 2...\r\n">>
<0.96.0> received: <<"Time to go\r\n">>
<0.96.0> Client saying goodbye. Bye!
<0.97.0> Client saying goodbye. Bye!

哦,neato。并发服务器!

从这里开始,您可以使用工具并更改此基本结构,以执行您可能想到的几乎任何事情。

请注意 此代码中缺少很多内容。我已经删除了它 edoc notations and typespecs (used by Dialyzer,一个在大型项目中非常重要的工具)——这对生产系统来说是一件坏事。

有关 production-style 项目的示例,该项目小到足以让您全神贯注(仅 3 个模块 + 完整文档),请参阅 zuuid。它是专门作为代码示例编写的,尽管它恰好是一个 full-featured UUID 生成器。

请原谅这个对您的(短得多的)问题的回答的庞大。这不时出现,我想写一个 cut-down 网络套接字服务的完整示例,我可以在将来向人们推荐它,当我读到你的问题时,恰好有这样做的渴望. :-) 希望 SO 纳粹分子会原谅这种严重的违规行为。