如何通过 AMQP 1.0 在 Azure Event Hub 中批量接收多条消息
How to receive many messages in a batch in the Azure Event Hub via AMQP 1.0
我使用 Qpid Proton 的 Apache Qpid Electron Go 包装器设置了一个仅包含路径和过滤器的 AMQP 1.0 link,如下所示:
amqpConnection.Receiver(
// the path containing the consumer group
// and the partition Id
electron.Source("<EVENTHUB_PATH>"),
// the filter map contains some annotations filters
// for the Event Hub offset
electron.Filter(filterMap),
)
我按照此文档设置了 AMQP link 选项:https://godoc.org/qpid.apache.org/electron#LinkOption
然而,在 运行使用 Go 应用程序时,我发现它在获取消息时非常慢,所以我又添加了 2 个 link 选项,如下所示:
amqpConnection.Receiver(
electron.Source("<EVENTHUB_PATH>"),
electron.Capacity(100),
electron.Prefetch(true),
electron.Filter(filterMap),
)
但是 在添加容量和预取 link 选项后,我没有看到性能有任何改善。
我每大约 5 秒从 4 个并行 link 中收到大约 10 条消息(每个分区一个 link)。
我已经尝试 运行 使用环境变量 PN_TRACE_RAW=true
的应用程序以获得 Qpid Proton 的详细输出(参见:https://qpid.apache.org/releases/qpid-proton-0.18.0/proton/c/api/group__transport.html),但我不确定
我应该寻找什么来解决这个问题。
我认为 Qpid 设置没有任何问题,但无论如何这是我在终端上看到的:
[0x9fd490]:0 -> @attach(18) [name="<MY_CUSTOM_NAME>",
handle=1, role=true, snd-settle-mode=0, rcv-settle-mode=0, source=@source(40) [address="<MY_CUSTOM_PATH>",
durable=0, expiry-policy=:"link-detach", timeout=0, dynamic=false, filter={:string=@:"apache.org:selector-filter:string"
"amqp.annotation.x-opt-offset > '<MY_CUSTOM_OFFSET>'"}], target=@target(41) [address="",
durable=0, expiry-policy=:"link-detach", timeout=0, dynamic=false], initial-delivery-count=0,
max-message-size=0]
[0x9fd490]:0 -> @flow(19) [next-incoming-id=1, incoming-window=2147483647, next-outgoing-id=1,
outgoing-window=0, handle=1, delivery-count=0, link-credit=100, drain=false]
我还尝试 运行 与事件中心位于同一 Azure 位置的 Azure VM 中的 Go 应用程序,但性能没有提高。
如何在同一个 "round trip" 中同时获取多条消息?
我需要每秒处理数千条消息。
您说得对,您需要预取 window,但电子客户端可以做得比这好很多。
我用 https://github.com/apache/qpid-proton/tree/master/examples/go/electron
中的电子示例进行了快速测试
即使没有预取,我也能得到 3000 msg/sec,而有将近 10000 msgs/sec。
$ ./broker -qsize 100000 &
Listening on [::]:5672
$ ./send -count 10000 /x ; time ./receive -count 10000 /x
Received all 10000 acknowledgements
Listening on 1 connections
Received 10000 messages
real 0m2.612s
user 0m1.611s
sys 0m0.510s
$ ./send -count 10000 /x ; time ./receive -count 10000 -prefetch 1000 /x
Received all 10000 acknowledgements
Listening on 1 connections
Received 10000 messages
real 0m1.053s
user 0m1.272s
sys 0m0.277s
显然发生了一些有趣的事情 - 我想帮助您查个水落石出。
PN_TRACE_RAW 有点太冗长而没有帮助,请尝试 PN_TRACE_FRM=1,这会给你一个更具可读性的摘要。
我很乐意在这里或在用户@qpid.apache.org 上继续对话,如果它变成一个支持案例而不是 question/answer。
我使用 Qpid Proton 的 Apache Qpid Electron Go 包装器设置了一个仅包含路径和过滤器的 AMQP 1.0 link,如下所示:
amqpConnection.Receiver(
// the path containing the consumer group
// and the partition Id
electron.Source("<EVENTHUB_PATH>"),
// the filter map contains some annotations filters
// for the Event Hub offset
electron.Filter(filterMap),
)
我按照此文档设置了 AMQP link 选项:https://godoc.org/qpid.apache.org/electron#LinkOption
然而,在 运行使用 Go 应用程序时,我发现它在获取消息时非常慢,所以我又添加了 2 个 link 选项,如下所示:
amqpConnection.Receiver(
electron.Source("<EVENTHUB_PATH>"),
electron.Capacity(100),
electron.Prefetch(true),
electron.Filter(filterMap),
)
但是 在添加容量和预取 link 选项后,我没有看到性能有任何改善。
我每大约 5 秒从 4 个并行 link 中收到大约 10 条消息(每个分区一个 link)。
我已经尝试 运行 使用环境变量 PN_TRACE_RAW=true
的应用程序以获得 Qpid Proton 的详细输出(参见:https://qpid.apache.org/releases/qpid-proton-0.18.0/proton/c/api/group__transport.html),但我不确定
我应该寻找什么来解决这个问题。
我认为 Qpid 设置没有任何问题,但无论如何这是我在终端上看到的:
[0x9fd490]:0 -> @attach(18) [name="<MY_CUSTOM_NAME>",
handle=1, role=true, snd-settle-mode=0, rcv-settle-mode=0, source=@source(40) [address="<MY_CUSTOM_PATH>",
durable=0, expiry-policy=:"link-detach", timeout=0, dynamic=false, filter={:string=@:"apache.org:selector-filter:string"
"amqp.annotation.x-opt-offset > '<MY_CUSTOM_OFFSET>'"}], target=@target(41) [address="",
durable=0, expiry-policy=:"link-detach", timeout=0, dynamic=false], initial-delivery-count=0,
max-message-size=0]
[0x9fd490]:0 -> @flow(19) [next-incoming-id=1, incoming-window=2147483647, next-outgoing-id=1,
outgoing-window=0, handle=1, delivery-count=0, link-credit=100, drain=false]
我还尝试 运行 与事件中心位于同一 Azure 位置的 Azure VM 中的 Go 应用程序,但性能没有提高。
如何在同一个 "round trip" 中同时获取多条消息? 我需要每秒处理数千条消息。
您说得对,您需要预取 window,但电子客户端可以做得比这好很多。
我用 https://github.com/apache/qpid-proton/tree/master/examples/go/electron
中的电子示例进行了快速测试即使没有预取,我也能得到 3000 msg/sec,而有将近 10000 msgs/sec。
$ ./broker -qsize 100000 &
Listening on [::]:5672
$ ./send -count 10000 /x ; time ./receive -count 10000 /x
Received all 10000 acknowledgements
Listening on 1 connections
Received 10000 messages
real 0m2.612s
user 0m1.611s
sys 0m0.510s
$ ./send -count 10000 /x ; time ./receive -count 10000 -prefetch 1000 /x
Received all 10000 acknowledgements
Listening on 1 connections
Received 10000 messages
real 0m1.053s
user 0m1.272s
sys 0m0.277s
显然发生了一些有趣的事情 - 我想帮助您查个水落石出。
PN_TRACE_RAW 有点太冗长而没有帮助,请尝试 PN_TRACE_FRM=1,这会给你一个更具可读性的摘要。
我很乐意在这里或在用户@qpid.apache.org 上继续对话,如果它变成一个支持案例而不是 question/answer。