在 Yaws/Erlang 中使用 streamcontent_from_pid 进行数据流式传输

Data streaming using streamcontent_from_pid in Yaws/Erlang

我希望 使用 yaws 将数据 流式传输到我的 Comet 应用程序,我已经阅读并努力理解它,但是来自雅司的示例似乎有点对我来说很复杂(我是 Erlang 的新手)。我只是无法理解...

这里是来自yaws的例子(我稍微修改了一下):

out(A) ->
    %% Create a random number
    {_A1, A2, A3} = now(),
    random:seed(erlang:phash(node(), 1),
                erlang:phash(A2, A3),
                A3),
    Sz = random:uniform(1),

    Pid = spawn(fun() ->
                        %% Read random junk
                        S="Hello World",
                        P = open_port({spawn, S}, [binary,stream, eof]),
                        rec_loop(A#arg.clisock, P)
                end),

    [{header, {content_length, Sz}},
     {streamcontent_from_pid, "text/html; charset=utf-8", Pid}].


rec_loop(Sock, P) ->
    receive
        {discard, YawsPid} ->
            yaws_api:stream_process_end(Sock, YawsPid);
        {ok, YawsPid} ->
            rec_loop(Sock, YawsPid, P)
    end,
    port_close(P),
    exit(normal).

rec_loop(Sock, YawsPid, P) ->
    receive
        {P, {data, BinData}} ->
            yaws_api:stream_process_deliver(Sock, BinData),
            rec_loop(Sock, YawsPid, P);
        {P, eof} ->
            yaws_api:stream_process_end(Sock, YawsPid)
    end.

我需要的是把上面的脚本改成可以和下面的结合起来

mysql:start_link(p1, "127.0.0.1", "root", "azzkikr", "mydb"),
                {data, Results}  = mysql:fetch(p1, "SELECT*FROM messages WHERE id > " ++ LASTID),
                {mysql_result, FieldNames, FieldValues, NoneA, NoneB} = Results,
                parse_data(FieldValues, [], [], [], [], [])

其中 parse_data(FieldValues, [], [], [], [], []) returns 条目的 JSON 字符串.. 组合这个脚本应该不断地检查数据库中的新条目,如果有,它应该像彗星一样获取。

谢谢,愿你们都去天堂!

正如 this answer 所解释的,有时您需要一个独立于任何传入 HTTP 请求的进程 运行。对于您的情况,您可以使用 publish/subscribe:

的形式
  • 发布者: 当您的 Erlang 节点启动时,启动某种数据库客户端进程,或此类进程池,执行您的查询并 运行 独立于偏航症。
  • 订阅者: 当 Yaws 接收到 HTTP 请求并将其分派给您的代码时,您的代码会订阅发布者。当发布者向订阅者发送数据时,订阅者将数据流式传输回 HTTP 客户端。

在这里详细说明完整的解决方案是不切实际的,但一般步骤是:

  • 当您的数据库客户端进程启动时,它们会将自己注册到 pg2 group or something similar. Use something like poolboy instead of rolling your own process pools, as they're notoriously tricky to get right. Each database client can be an instance of a gen_server 运行 查询中,接收数据库结果,并处理订阅请求调用。
  • 当您的 Yaws 代码收到请求时,它会查找数据库客户端发布进程并订阅它。订阅需要调用数据库客户端模块中的函数,该函数又使用 gen_server:call/2,3 to communicate with the actual gen_server publisher process. The subscriber uses Yaws streaming capabilities (or SSE or WebSocket) 完成与 HTTP 客户端的连接并向其发送任何所需的响应 headers.
  • 发布者存储订阅者的进程 ID,并在订阅者上建立一个 monitor,以便在订阅者死亡或意外退出时清理订阅。
  • 发布者在发送给该订阅者的消息中使用 monitor's reference 作为唯一 ID,因此订阅函数 returns 引用订阅者。订阅者使用引用来匹配来自发布者的传入消息。
  • 当发布者从数据库中获得新的查询结果时,它会将数据发送给它的每个订阅者。这可以通过普通的 Erlang 消息来完成。
  • 订阅者使用 Yaws streaming functions (or SSE or WebSocket 功能)将查询结果发送到 HTTP 客户端。
  • 当 HTTP 客户端断开连接时,订阅者调用另一个发布者函数来取消订阅。