为主题路由处理 routing_key

Handling routing_key for topic routing

我对 Erlang 环境有点陌生

我正在编写一个电子邮件测试应用程序,该应用程序使用主题交换中随机生成的 routing_keys 过滤传入电子邮件,以使电子邮件进入我的系统

一旦它们在队列中被交付(和处理),我想再次用之前随机标记它们 routing_key 将它们路由到另一个交换,让它们为最终消费做好准备。

第二个制作步骤给我带来了真正的麻烦

我正在使用 handle_info 模式匹配

从 tcp 套接字(由第三层程序处理:spamassassin)取回数据

我依赖gen_server首先通过常规amqp_client/include/amqp_client.hrl库

消费消息

我在 gen_server 行为中使用 handle_info,然后对参数进行模式匹配。

检测传递的 AMQP 消息是通过 handle_info 回调中的函数头(记录)完成的

TCP 套接字很适合与 spamassassin 交谈,它 returns 我是一个带有二进制字符串数据的 3 元组:

{tcp,#Port<0.55>,<<"SPAMD/1.1 0 EX_OK\r\nContent-length: 564\r\nSpam: True ; 7.9 / 5.0\r\n\r\nReceived: from localhost by XXXX.ikexpress.com\n\twith SpamAssassin (version 3.4.2);\n\tThu, 15 Aug 2019 21:44:12 +0200\nX-Spam-Checker-Version: SpamAssassin 3.4.2 (2018-09-13) on\n\tXXXXX.ikexpress.com\nX-Spam-Flag: YES\nX-Spam-Level: *******\nX-Spam-Status: Yes, score=7.9 required=5.0 tests=EMPTY_MESSAGE,MISSING_DATE,\n\tMISSING_FROM,MISSING_HEADERS,MISSING_MID,MISSING_SUBJECT,\n\tNO_HEADERS_MESSAGE,NO_RECEIVED,NO_RELAYS autolearn=no\n\tautolearn_force=no version=3.4.2\nMIME-Version: 1.0\nContent-Type: multipart/mixed; boundary=\"----------=_5D55B60C.D2FC2670\"\n\n">>}

第二个 handle_info 中的循环与监听 gen_tcp 服务器的答案匹配正常,但我必须进行打包才能将其发送到主题 Exchange (topic_scored_email exchange )

***My gen_server****
handle_info({#'basic.deliver'{routing_key=Key, consumer_tag=Tag}, Content}, State) ->
    #amqp_msg{props = Properties, payload = Payload} = Content,
    #'P_basic'{message_id = MessageId, headers = Headers} = Properties,
    send_to_spamassassin:calcule_score(Payload),
    {noreply, State};
handle_info(Msg, State) ->
    case Msg of
        {_,_,Data} ->
           scored_email:main(Data);
        {_,_} ->
    end,
    {noreply, State}.

***send_to_spamassassin function ***
    calcule_score(Message) ->
    case gen_tcp:connect("localhost", 783, [{mode, binary}]) of
        {ok, Sock} ->
            …
            gen_tcp:send(Sock, Message2);
        {error,_} ->
            io:fwrite("Connection error! Quitting...~n")
    end.

***scored_email***
main(Argv) ->
    {ok, Connection} = amqp_connection:start(#amqp_params_network{virtual_host = <<"/">>}),
    {ok, Channel} = amqp_connection:open_channel(Connection),
    amqp_channel:call(Channel, #'exchange.declare'{exchange = <<"topic_scored_email">>,type = <<"topic">>}),
    {RoutingKey, Message} = case Argv of
                                …
%DOING PATTERN MATCHING THAT WORKS HERE
                                …
                            end,
    amqp_channel:cast(Channel,#'basic.publish'{exchange = <<"topic_scored_email">>,routing_key = RoutingKey},#amqp_msg{payload = Message})

第一个问题是数据类型(二进制字符串),但我想这可能是使用 BIF binary_to_tuple 或类似东西的解决方法。

我很难理解的是如何传递正确的 RoutingKey,因为 Erlang 是函数式的,没有副作用或赋值。

格式数据的改变(AMQP --> 原始 TCP --> 然后又是 AMQP)似乎不可能(对我而言)通过 OTP 抽象实现

但是,我想用与上面 5 行匹配的正确路由键重新组合每条已处理的消息。

我该如何修改我的代码才能做到这一点?我来自命令式语言,在这里达到了我的极限……

你的

The first issue is type of the data (binary string) but I guess it can be a workaround using BIF binary_to_tuple or stuff like that.

在所有语言中,您都必须弄清楚如何解析从套接字读取的数据。

What I struggle to understand is how I could pass the right RoutingKey, since Erlang is functional, there is no side effect or assignation.

这是套话,实际上递归函数的参数变量可以用来存值。在您的情况下,您可以将路由键存储在 State 变量中,然后在所有 gen_server 回调函数中都可用。如果需要,State 可以是 30 个元素的元组,因此可以在 State 变量中存储多少信息没有限制。

另一种选择是使用 ets/dets table,即 erlang 数据库来存储带有路由键的消息,直到您准备好发送“所有内容”为止。到其他进程。

{RoutingKey, Message} = ...

However, I would like to reassemble every processed message with the right routing key matched 5 lines above.

如果您在同一个函数中,是什么阻止您使用变量 RoutingKeyMessage 中的路由键和消息?我不清楚如果所有代码都在一个函数中会出现什么问题。我想你可以这样做:

{RoutingKey, Message} = ...
ProcessedMsg = process_this(Message)
{RoutingKey, ProcessedMsg}

我建议你post一个简单的问题示例——没有所有复杂的匹配和amqp_channel东西来提炼问题的核心,例如

handle_info(Msg, State) -> 
    RoutingKey = 3,
    ProcessedMsg = "hello",

    %% Here, I want to write: ....