如何修改 RabbitMQ 3.5.6 自定义交换中的传递(消息)?
How can I modify delivery(message) inside custom exchange for RabbitMQ 3.5.6?
我正在为 RabbitMQ 编写自定义交换,它必须从交换中获取参数并将它们放入消息 headers。我对Erlang不是很熟悉。
-module(rabbit_exchange_type_arguments_to_headers).
-include_lib("rabbit/include/rabbit.hrl").
-include_lib("rabbit/include/rabbit_framing.hrl").
-behaviour(rabbit_exchange_type).
%% API
-export([
description/0,
serialise_events/0,
route/2,
validate/1,
validate_binding/2,
create/2,
delete/3,
policy_changed/2,
add_binding/3,
remove_bindings/3,
assert_args_equivalence/2
]).
-rabbit_boot_step(
{?MODULE,
[{description, "exchange type argument-to-header"},
{mfa, {rabbit_registry, register, [exchange, <<"argument">>, ?MODULE]}},
{cleanup, {rabbit_registry, unregister, [exchange, <<"argumente">>]}},
{requires, rabbit_registry},
{enables, kernel_ready}]
}
).
description() ->
[{name, <<"argument">>},
{description, <<"Adds exchange argumets to message headers">>}].
serialise_events() -> false.
% This is classic fanout routing
route(#exchange{name = Name, arguments = Arguments},
#delivery{message = #basic_message{content = #content{properties = #'P_basic'{headers = MessageHeaders}}}} = Delivery) ->
Headers = make_headers(MessageHeaders, Arguments),
NewDelivery = Delivery#delivery{message = #basic_message{content = #content{properties = #'P_basic'{headers = Headers}}}},
% Do some thing here to send NewDelivery instead of old one
rabbit_router:match_routing_key(Name, ['_']).
make_headers(undefined, undefined) -> [];
make_headers(undefined, Arguments) -> Arguments;
make_headers(Headers, undefined) -> Headers;
make_headers(Headers, Arguments) -> lists:append(Headers, Arguments).
validate_binding(_X, _B) -> ok.
validate(_Exchange) -> ok.
create(_Tx, _X) -> ok.
deladd_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(Exchange, Args) ->
rabbit_exchange:assert_args_equivalence(Exchange, Args).
ete(_Tx, _X, _Bs) -> ok.
policy_changed(_X1, _X2) -> ok.
我在 rout 方法中创建了新的 Delivery,但据我所知,我无法在 Erlang 中修改原始变量,因此 Rabbit 继续使用原始变量。
有没有什么方法可以修改 exchange 中的消息,或者让 rabbit 继续处理新消息。
可以修改源码,一切皆有可能
我已经解决了问题。
如果有人感兴趣,请查看我的存储库 https://github.com/thecederick/rabbitmq-arguments-to-headers-exchange。
有可供使用的源代码和工件。
我正在为 RabbitMQ 编写自定义交换,它必须从交换中获取参数并将它们放入消息 headers。我对Erlang不是很熟悉。
-module(rabbit_exchange_type_arguments_to_headers).
-include_lib("rabbit/include/rabbit.hrl").
-include_lib("rabbit/include/rabbit_framing.hrl").
-behaviour(rabbit_exchange_type).
%% API
-export([
description/0,
serialise_events/0,
route/2,
validate/1,
validate_binding/2,
create/2,
delete/3,
policy_changed/2,
add_binding/3,
remove_bindings/3,
assert_args_equivalence/2
]).
-rabbit_boot_step(
{?MODULE,
[{description, "exchange type argument-to-header"},
{mfa, {rabbit_registry, register, [exchange, <<"argument">>, ?MODULE]}},
{cleanup, {rabbit_registry, unregister, [exchange, <<"argumente">>]}},
{requires, rabbit_registry},
{enables, kernel_ready}]
}
).
description() ->
[{name, <<"argument">>},
{description, <<"Adds exchange argumets to message headers">>}].
serialise_events() -> false.
% This is classic fanout routing
route(#exchange{name = Name, arguments = Arguments},
#delivery{message = #basic_message{content = #content{properties = #'P_basic'{headers = MessageHeaders}}}} = Delivery) ->
Headers = make_headers(MessageHeaders, Arguments),
NewDelivery = Delivery#delivery{message = #basic_message{content = #content{properties = #'P_basic'{headers = Headers}}}},
% Do some thing here to send NewDelivery instead of old one
rabbit_router:match_routing_key(Name, ['_']).
make_headers(undefined, undefined) -> [];
make_headers(undefined, Arguments) -> Arguments;
make_headers(Headers, undefined) -> Headers;
make_headers(Headers, Arguments) -> lists:append(Headers, Arguments).
validate_binding(_X, _B) -> ok.
validate(_Exchange) -> ok.
create(_Tx, _X) -> ok.
deladd_binding(_Tx, _X, _B) -> ok.
remove_bindings(_Tx, _X, _Bs) -> ok.
assert_args_equivalence(Exchange, Args) ->
rabbit_exchange:assert_args_equivalence(Exchange, Args).
ete(_Tx, _X, _Bs) -> ok.
policy_changed(_X1, _X2) -> ok.
我在 rout 方法中创建了新的 Delivery,但据我所知,我无法在 Erlang 中修改原始变量,因此 Rabbit 继续使用原始变量。
有没有什么方法可以修改 exchange 中的消息,或者让 rabbit 继续处理新消息。
可以修改源码,一切皆有可能
我已经解决了问题。 如果有人感兴趣,请查看我的存储库 https://github.com/thecederick/rabbitmq-arguments-to-headers-exchange。 有可供使用的源代码和工件。