使用 Rabbitmq-c 以编程方式声明具有 x-max-length 的队列
Declare a queue with x-max-length programmatically using Rabbitmq-c
我正在为我的 C 应用程序实现一个 RPC 函数,并尝试以编程方式声明一个 limits maximum number of pending messages, after reading the declaration of amqp_table_entry_t
and amqp_field_value_t
in amqp.h
的队列,这是我的最小代码示例:
int default_channel_id = 1;
int passive = 0;
int durable = 1;
int exclusive = 0;
int auto_delete = 0;
amqp_table_entry_t *q_arg_n_elms = malloc(sizeof(amqp_table_entry_t));
*q_arg_n_elms = (amqp_table_entry_t) {.key = amqp_cstring_bytes("x-max-length"),
.value = {.kind = AMQP_FIELD_KIND_U32, .value = {.u32 = 234 }}};
amqp_table_t q_arg_table = {.num_entries=1, .entries=q_arg_n_elms};
amqp_queue_declare( conn, default_channel_id, amqp_cstring_bytes("my_queue_123"),
passive, durable, exclusive, auto_delete, q_arg_table );
amqp_rpc_reply_t _reply = amqp_get_rpc_reply(conn);
上面的代码在 amqp_rpc_reply_t
的对象中总是 returns AMQP_RESPONSE_LIBRARY_EXCEPTION
,错误消息是 a socket error occurred
,我没有看到此代码触发的任何活动连接在 RabbitMQ 的 Web 管理 UI 中。所以我认为 rabbitmq-c
库没有建立连接,只是回复错误。
然而,当我将参数 q_arg_table
替换为默认值 amqp_empty_table
(这意味着没有参数)时,一切正常。
这是我的问题:
- 在哪里可以找到过滤队列参数无效键的代码?根据 this article ,
x-max-length
应该是限制队列中消息数量的正确参数键,但我无法弄清楚为什么库仍然报告错误。
- 是否有任何示例可以演示如何正确设置
amqp_table_t
传入 amqp_queue_declare(...)
?
开发环境:
- RabbitMQ v3.2.4
- rabbitmq-c v0.11.0
感谢任何反馈,感谢阅读。
[编辑]
根据服务器日志rabbit@myhostname-sasl.log
,RabbitMQ代理接受了一个新的连接,发现接收帧解码错误,然后立即关闭连接。我还没有弄清楚 Erlang 的实现,但根本原因可能是声明队列时 table 参数的解码错误。
131 =CRASH REPORT==== 18-May-2022::16:05:46 ===
132 crasher:
133 initial call: rabbit_reader:init/2
134 pid: <0.23706.1>
135 registered_name: []
136 exception error: no function clause matching
137 rabbit_binary_parser:parse_field_value(<<105,0,0,1,44>>) (src/rabbit_binary_parser.erl, line 53)
138 in function rabbit_binary_parser:parse_table/1 (src/rabbit_binary_parser.erl, line 44)
139 in call from rabbit_framing_amqp_0_9_1:decode_method_fields/2 (src/rabbit_framing_amqp_0_9_1.erl, line 791)
140 in call from rabbit_command_assembler:process/2 (src/rabbit_command_assembler.erl, line 85)
141 in call from rabbit_reader:process_frame/3 (src/rabbit_reader.erl, line 688)
142 in call from rabbit_reader:handle_input/3 (src/rabbit_reader.erl, line 738)
143 in call from rabbit_reader:recvloop/2 (src/rabbit_reader.erl, line 292)
144 in call from rabbit_reader:run/1 (src/rabbit_reader.erl, line 273)
145 ancestors: [<0.23704.1>,rabbit_tcp_client_sup,rabbit_sup,<0.145.0>]
146 messages: [{'EXIT',#Port<0.31561>,normal}]
147 links: [<0.23704.1>]
148 dictionary: [{{channel,1},
149 {<0.23720.1>,{method,rabbit_framing_amqp_0_9_1}}},
150 {{ch_pid,<0.23720.1>},{1,#Ref<0.0.20.156836>}}]
151 trap_exit: true
152 status: running
153 heap_size: 2586
154 stack_size: 27
155 reductions: 2849
156 neighbours:
RabbitMQ 可能不支持无符号整数作为 table 值。
改为尝试使用带符号的 32 位或 64 位数字(例如,.value = {.kind = AMQP_FIELD_KIND_I32, .value = {.i32 = 234 }}
)。
此外,RabbitMQ 服务器日志可能包含额外的调试信息,可以帮助理解此类错误以及 amqp_error_string2
函数可用于将 error-code 转换为 error-string。
我正在为我的 C 应用程序实现一个 RPC 函数,并尝试以编程方式声明一个 limits maximum number of pending messages, after reading the declaration of amqp_table_entry_t
and amqp_field_value_t
in amqp.h
的队列,这是我的最小代码示例:
int default_channel_id = 1;
int passive = 0;
int durable = 1;
int exclusive = 0;
int auto_delete = 0;
amqp_table_entry_t *q_arg_n_elms = malloc(sizeof(amqp_table_entry_t));
*q_arg_n_elms = (amqp_table_entry_t) {.key = amqp_cstring_bytes("x-max-length"),
.value = {.kind = AMQP_FIELD_KIND_U32, .value = {.u32 = 234 }}};
amqp_table_t q_arg_table = {.num_entries=1, .entries=q_arg_n_elms};
amqp_queue_declare( conn, default_channel_id, amqp_cstring_bytes("my_queue_123"),
passive, durable, exclusive, auto_delete, q_arg_table );
amqp_rpc_reply_t _reply = amqp_get_rpc_reply(conn);
上面的代码在 amqp_rpc_reply_t
的对象中总是 returns AMQP_RESPONSE_LIBRARY_EXCEPTION
,错误消息是 a socket error occurred
,我没有看到此代码触发的任何活动连接在 RabbitMQ 的 Web 管理 UI 中。所以我认为 rabbitmq-c
库没有建立连接,只是回复错误。
然而,当我将参数 q_arg_table
替换为默认值 amqp_empty_table
(这意味着没有参数)时,一切正常。
这是我的问题:
- 在哪里可以找到过滤队列参数无效键的代码?根据 this article ,
x-max-length
应该是限制队列中消息数量的正确参数键,但我无法弄清楚为什么库仍然报告错误。 - 是否有任何示例可以演示如何正确设置
amqp_table_t
传入amqp_queue_declare(...)
?
开发环境:
- RabbitMQ v3.2.4
- rabbitmq-c v0.11.0
感谢任何反馈,感谢阅读。
[编辑]
根据服务器日志rabbit@myhostname-sasl.log
,RabbitMQ代理接受了一个新的连接,发现接收帧解码错误,然后立即关闭连接。我还没有弄清楚 Erlang 的实现,但根本原因可能是声明队列时 table 参数的解码错误。
131 =CRASH REPORT==== 18-May-2022::16:05:46 ===
132 crasher:
133 initial call: rabbit_reader:init/2
134 pid: <0.23706.1>
135 registered_name: []
136 exception error: no function clause matching
137 rabbit_binary_parser:parse_field_value(<<105,0,0,1,44>>) (src/rabbit_binary_parser.erl, line 53)
138 in function rabbit_binary_parser:parse_table/1 (src/rabbit_binary_parser.erl, line 44)
139 in call from rabbit_framing_amqp_0_9_1:decode_method_fields/2 (src/rabbit_framing_amqp_0_9_1.erl, line 791)
140 in call from rabbit_command_assembler:process/2 (src/rabbit_command_assembler.erl, line 85)
141 in call from rabbit_reader:process_frame/3 (src/rabbit_reader.erl, line 688)
142 in call from rabbit_reader:handle_input/3 (src/rabbit_reader.erl, line 738)
143 in call from rabbit_reader:recvloop/2 (src/rabbit_reader.erl, line 292)
144 in call from rabbit_reader:run/1 (src/rabbit_reader.erl, line 273)
145 ancestors: [<0.23704.1>,rabbit_tcp_client_sup,rabbit_sup,<0.145.0>]
146 messages: [{'EXIT',#Port<0.31561>,normal}]
147 links: [<0.23704.1>]
148 dictionary: [{{channel,1},
149 {<0.23720.1>,{method,rabbit_framing_amqp_0_9_1}}},
150 {{ch_pid,<0.23720.1>},{1,#Ref<0.0.20.156836>}}]
151 trap_exit: true
152 status: running
153 heap_size: 2586
154 stack_size: 27
155 reductions: 2849
156 neighbours:
RabbitMQ 可能不支持无符号整数作为 table 值。
改为尝试使用带符号的 32 位或 64 位数字(例如,.value = {.kind = AMQP_FIELD_KIND_I32, .value = {.i32 = 234 }}
)。
此外,RabbitMQ 服务器日志可能包含额外的调试信息,可以帮助理解此类错误以及 amqp_error_string2
函数可用于将 error-code 转换为 error-string。