Azure Java ServiceBus Queue Java 批量接收或完成消息以获得更好的性能
Azure Java ServiceBus Queue Java receive or complete messages in batch for better performance
我正在尝试从队列接收消息并尝试了不同的方法,但遇到了性能问题。以下是每种类型 运行 的指标:
- 接收模式 = 查看和锁定; 1000条消息需要2.5分钟,因为我必须一条一条地完成
- 接收模式=接收和删除; 1000 条消息平均耗时 1.5 分钟。
- 接收模式=接收和删除(预取计数为100); 1000 条消息花了 3 秒,但我最终丢失了执行结束时缓冲区中的 100 条消息
- 接收模式=查看和锁定(预取计数为100); 1000 条消息需要 2 分钟,因为我必须再次完成每条消息。只有当有办法批量完成它们时,它才会是一个问题解决者。
下面是我的代码供参考:
ServiceBusSessionReceiverClient sessionReceiverClient = new ServiceBusClientBuilder()
.connectionString(System.getenv("QueueConnectionString"))
.sessionReceiver()
.maxAutoLockRenewDuration(Duration.ofMinutes(2))
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.queueName(queueName)
.buildClient();
ServiceBusReceiverClient receiverClient = sessionReceiverClient.acceptSession(System.getenv("QueueSessionName"));
ObjectMapper objectMapper = new ObjectMapper();
do {
receiverClient.receiveMessages((int) prefetchCount).stream().forEach(message -> {
try {
String str = message.getBody().toString();
final T dataDto = objectMapper.readValue(message.getBody().toString(), returnType);
dataDtoList.add(dataDto);
receiverClient.complete(message);
} catch (Exception e) {
AzFaUtil.getLogger().severe("Message processing failed. Error: " + e.getMessage() + e + "\n Payload: "
+ message);
}
});
} while (dataDtoList.size() < numberOfMessages);
receiverClient.close();
sessionReceiverClient.close();
我能想到的可能的解决方案:
- 如果有一种方法可以批量完成消息而不是一条一条地完成。
- 如果有办法将位于预取缓冲区中的消息重新排队回队列。
注意:这个API需要是同步的。我刚刚试验了 1000 个条目,但我正在处理 30000 个条目,因此性能很重要。队列也启用了会话,也启用了分区
根据 this issue,Microsoft 尚未测试其 ServiceBus 的性能。由于 FIFO(先进先出)是我的消息队列的要求,我使用了 JMS,它的平均执行速度几乎快 10 倍,但有一个缺点。目前,JMS 不支持基于会话的队列,所以我不得不禁用会话,然后为了确保 FIFO,我还必须禁用队列上的分区。在 Microsoft 提高其 ServiceBusRecieverClient 的性能或在 JMS 上启用会话之前,这是一个部分和临时的解决方案,以获得更好的性能。
我正在尝试从队列接收消息并尝试了不同的方法,但遇到了性能问题。以下是每种类型 运行 的指标:
- 接收模式 = 查看和锁定; 1000条消息需要2.5分钟,因为我必须一条一条地完成
- 接收模式=接收和删除; 1000 条消息平均耗时 1.5 分钟。
- 接收模式=接收和删除(预取计数为100); 1000 条消息花了 3 秒,但我最终丢失了执行结束时缓冲区中的 100 条消息
- 接收模式=查看和锁定(预取计数为100); 1000 条消息需要 2 分钟,因为我必须再次完成每条消息。只有当有办法批量完成它们时,它才会是一个问题解决者。
下面是我的代码供参考:
ServiceBusSessionReceiverClient sessionReceiverClient = new ServiceBusClientBuilder()
.connectionString(System.getenv("QueueConnectionString"))
.sessionReceiver()
.maxAutoLockRenewDuration(Duration.ofMinutes(2))
.receiveMode(ServiceBusReceiveMode.PEEK_LOCK)
.queueName(queueName)
.buildClient();
ServiceBusReceiverClient receiverClient = sessionReceiverClient.acceptSession(System.getenv("QueueSessionName"));
ObjectMapper objectMapper = new ObjectMapper();
do {
receiverClient.receiveMessages((int) prefetchCount).stream().forEach(message -> {
try {
String str = message.getBody().toString();
final T dataDto = objectMapper.readValue(message.getBody().toString(), returnType);
dataDtoList.add(dataDto);
receiverClient.complete(message);
} catch (Exception e) {
AzFaUtil.getLogger().severe("Message processing failed. Error: " + e.getMessage() + e + "\n Payload: "
+ message);
}
});
} while (dataDtoList.size() < numberOfMessages);
receiverClient.close();
sessionReceiverClient.close();
我能想到的可能的解决方案:
- 如果有一种方法可以批量完成消息而不是一条一条地完成。
- 如果有办法将位于预取缓冲区中的消息重新排队回队列。
注意:这个API需要是同步的。我刚刚试验了 1000 个条目,但我正在处理 30000 个条目,因此性能很重要。队列也启用了会话,也启用了分区
根据 this issue,Microsoft 尚未测试其 ServiceBus 的性能。由于 FIFO(先进先出)是我的消息队列的要求,我使用了 JMS,它的平均执行速度几乎快 10 倍,但有一个缺点。目前,JMS 不支持基于会话的队列,所以我不得不禁用会话,然后为了确保 FIFO,我还必须禁用队列上的分区。在 Microsoft 提高其 ServiceBusRecieverClient 的性能或在 JMS 上启用会话之前,这是一个部分和临时的解决方案,以获得更好的性能。