Spring Cloud Stream StreamBridge 性能低下?
Spring Cloud Stream StreamBridge low performance?
我正在使用 Spring Cloud StreamBridge 将消息发布到 RabbitMQ 交换器。使用本机 RabbitMQ PerfTest,我可以使用单个生产者轻松获得高达 100k msgs/s(1 个通道)。如果我启动一个带有发送 StreamBrige(也是 1 个通道)的 while 循环的线程,我只会得到 ~20k msgs/s 具有类似的设置(没有持久性,没有手动确认或确认,相同的 Docker 容器..).我正在使用 Spring Cloud Stream 和 Rabbit Binder 3.2.2.
我的 yml 如下所示:
spring:
rabbitmq:
host: localhost
port: 5672
cloud:
function:
definition: producer1;
stream:
bindings:
producer1-out-0:
destination: messageQueue
#requiredGroups: consumerGroup1,
rabbit:
bindings:
producer1-out-0:
producer:
deliveryMode: NON_PERSISTENT
exchangeType: direct
bindingRoutingKey: default_message
routingKeyExpression: '''default_message'''
#maxLength: 1
output-bindings: producer1;
我的发送循环 RabbitMQ PerfTest-Tool 是用 Java 编写的,看起来很相似:
@Autowired
public StreamBridge streamBridge;
ExecutorService executorService = Executors.newFixedThreadPool(10);
@PostConstruct
public void launchProducer() {
Runnable task = () -> {
while (true){
streamBridge.send("producer1-out-0", "msg");
}
};
executorService.submit(task);
}
同样在我的控制台中,我在启动时收到一条奇怪的消息 Channel 'unknown.channel.name' has 1 subscriber(s)
,我不知道为什么。
使用 StreamBridge 的缓慢发送速率是自然的 Spring 限制还是我配置有误?
感谢您的帮助:)
在本机之上使用抽象时总会有一些开销 API;但是,5x 听起来不对。
i'm using -x 1 -y 1 -a as arguments, means only 1 producer is publishing messages with auto consumer-acks
这可能解释了它; auto ack 意味着没有 acks - 代理在消息发送给消费者时立即确认消息(冒着消息丢失的风险)。 Spring 中的等价物是 Acknowledgemode.NONE
;默认情况下,容器会单独确认每条消息。
见
https://docs.spring.io/spring-amqp/docs/current/reference/html/#acknowledgeMode
和
https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize
还有
https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount
Spring AMQP 默认设置为 250,但是 SCSt 的默认值为 1,速度明显较慢。
编辑
有趣; SCSt 似乎确实比单独 Spring 集成增加了一些显着的开销。
下面测试来自原生 Java 客户端的各种场景,并在上面添加越来越多的 Spring 抽象,最后使用 StreamBridge
;可能应该对其进行概要分析,以查看成本在哪里以及是否可以减轻成本。
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct
logging.level.root=warn
@SpringBootApplication
public class So71414000Application {
public static void main(String[] args) {
SpringApplication.run(So71414000Application.class, args).close();
}
@Bean
ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
return args -> {
/*
* Native java API
*/
Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
Channel channel = conn.createChannel();
byte[] msg = "msg".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
int count = 1000000;
StopWatch watch = watch("native");
IntStream.range(0, count).forEach(i -> {
try {
channel.basicPublish("foo", "", props, msg);
}
catch (IOException e) {
e.printStackTrace();
}
});
perf(count, watch);
channel.close();
conn.close();
};
}
@Bean
ApplicationRunner runner2(RabbitTemplate template) {
return args -> {
/*
* Single ChannelProxy, no cache, no conversion
*/
Message msg = MessageBuilder.withBody("msg".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
StopWatch watch = watch("nocache");
int count = 1000000;
template.invoke(t -> {
IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
return null;
});
perf(count, watch);
};
}
@Bean
ApplicationRunner runner3(RabbitTemplate template) {
return args -> {
/*
* ChannelProxy (cached), no conversion
*/
Message msg = MessageBuilder.withBody("msg".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
StopWatch watch = watch("cached channel");
int count = 1000000;
IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner4(RabbitTemplate template) {
return args -> {
/*
* ChannelProxy (cached), conversion
*/
StopWatch watch = watch("message conversion");
int count = 1000000;
IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner5(RabbitTemplate template) {
return args -> {
/*
* Spring Integration
*/
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
outbound.setExchangeName("foo");
outbound.setRoutingKey("");
DirectChannel channel = new DirectChannel();
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
consumer.start();
GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
StopWatch watch = watch("Spring Integration");
int count = 1000000;
IntStream.range(0, count).forEach(i -> channel.send(msg));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner6(StreamBridge bridge) {
return args -> {
/*
* Stream bridge
*/
StopWatch watch = watch("Stream Bridge");
int count = 1000000;
IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
perf(count, watch);
};
}
private StopWatch watch(String name) {
StopWatch watch = new StopWatch();
watch.start(name);
return watch;
}
private void perf(int count, StopWatch watch) {
watch.stop();
System.out.println(watch.prettyPrint());
System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
}
}
在我的 MacBook Air (2018 1.6GHz I5) 和裸机代理上得到这些结果:
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.4)
StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns % Task name
---------------------------------------------
10949129530 100% native
91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns % Task name
---------------------------------------------
14175481691 100% nocache
70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns % Task name
---------------------------------------------
16300449457 100% cached channel
61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns % Task name
---------------------------------------------
18206111556 100% message conversion
54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns % Task name
---------------------------------------------
26654581638 100% Spring Integration
37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns % Task name
---------------------------------------------
102734493141 100% Stream Bridge
9k/s
我正在使用 Spring Cloud StreamBridge 将消息发布到 RabbitMQ 交换器。使用本机 RabbitMQ PerfTest,我可以使用单个生产者轻松获得高达 100k msgs/s(1 个通道)。如果我启动一个带有发送 StreamBrige(也是 1 个通道)的 while 循环的线程,我只会得到 ~20k msgs/s 具有类似的设置(没有持久性,没有手动确认或确认,相同的 Docker 容器..).我正在使用 Spring Cloud Stream 和 Rabbit Binder 3.2.2.
我的 yml 如下所示:
spring:
rabbitmq:
host: localhost
port: 5672
cloud:
function:
definition: producer1;
stream:
bindings:
producer1-out-0:
destination: messageQueue
#requiredGroups: consumerGroup1,
rabbit:
bindings:
producer1-out-0:
producer:
deliveryMode: NON_PERSISTENT
exchangeType: direct
bindingRoutingKey: default_message
routingKeyExpression: '''default_message'''
#maxLength: 1
output-bindings: producer1;
我的发送循环 RabbitMQ PerfTest-Tool 是用 Java 编写的,看起来很相似:
@Autowired
public StreamBridge streamBridge;
ExecutorService executorService = Executors.newFixedThreadPool(10);
@PostConstruct
public void launchProducer() {
Runnable task = () -> {
while (true){
streamBridge.send("producer1-out-0", "msg");
}
};
executorService.submit(task);
}
同样在我的控制台中,我在启动时收到一条奇怪的消息 Channel 'unknown.channel.name' has 1 subscriber(s)
,我不知道为什么。
使用 StreamBridge 的缓慢发送速率是自然的 Spring 限制还是我配置有误? 感谢您的帮助:)
在本机之上使用抽象时总会有一些开销 API;但是,5x 听起来不对。
i'm using -x 1 -y 1 -a as arguments, means only 1 producer is publishing messages with auto consumer-acks
这可能解释了它; auto ack 意味着没有 acks - 代理在消息发送给消费者时立即确认消息(冒着消息丢失的风险)。 Spring 中的等价物是 Acknowledgemode.NONE
;默认情况下,容器会单独确认每条消息。
见 https://docs.spring.io/spring-amqp/docs/current/reference/html/#acknowledgeMode
和
https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize
还有
https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount
Spring AMQP 默认设置为 250,但是 SCSt 的默认值为 1,速度明显较慢。
编辑
有趣; SCSt 似乎确实比单独 Spring 集成增加了一些显着的开销。
下面测试来自原生 Java 客户端的各种场景,并在上面添加越来越多的 Spring 抽象,最后使用 StreamBridge
;可能应该对其进行概要分析,以查看成本在哪里以及是否可以减轻成本。
spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct
logging.level.root=warn
@SpringBootApplication
public class So71414000Application {
public static void main(String[] args) {
SpringApplication.run(So71414000Application.class, args).close();
}
@Bean
ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
return args -> {
/*
* Native java API
*/
Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
Channel channel = conn.createChannel();
byte[] msg = "msg".getBytes();
AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
int count = 1000000;
StopWatch watch = watch("native");
IntStream.range(0, count).forEach(i -> {
try {
channel.basicPublish("foo", "", props, msg);
}
catch (IOException e) {
e.printStackTrace();
}
});
perf(count, watch);
channel.close();
conn.close();
};
}
@Bean
ApplicationRunner runner2(RabbitTemplate template) {
return args -> {
/*
* Single ChannelProxy, no cache, no conversion
*/
Message msg = MessageBuilder.withBody("msg".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
StopWatch watch = watch("nocache");
int count = 1000000;
template.invoke(t -> {
IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
return null;
});
perf(count, watch);
};
}
@Bean
ApplicationRunner runner3(RabbitTemplate template) {
return args -> {
/*
* ChannelProxy (cached), no conversion
*/
Message msg = MessageBuilder.withBody("msg".getBytes())
.andProperties(MessagePropertiesBuilder.newInstance()
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
StopWatch watch = watch("cached channel");
int count = 1000000;
IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner4(RabbitTemplate template) {
return args -> {
/*
* ChannelProxy (cached), conversion
*/
StopWatch watch = watch("message conversion");
int count = 1000000;
IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner5(RabbitTemplate template) {
return args -> {
/*
* Spring Integration
*/
AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
outbound.setExchangeName("foo");
outbound.setRoutingKey("");
DirectChannel channel = new DirectChannel();
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
consumer.start();
GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
StopWatch watch = watch("Spring Integration");
int count = 1000000;
IntStream.range(0, count).forEach(i -> channel.send(msg));
perf(count, watch);
};
}
@Bean
ApplicationRunner runner6(StreamBridge bridge) {
return args -> {
/*
* Stream bridge
*/
StopWatch watch = watch("Stream Bridge");
int count = 1000000;
IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
perf(count, watch);
};
}
private StopWatch watch(String name) {
StopWatch watch = new StopWatch();
watch.start(name);
return watch;
}
private void perf(int count, StopWatch watch) {
watch.stop();
System.out.println(watch.prettyPrint());
System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
}
}
在我的 MacBook Air (2018 1.6GHz I5) 和裸机代理上得到这些结果:
. ____ _ __ _ _
/\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.6.4)
StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns % Task name
---------------------------------------------
10949129530 100% native
91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns % Task name
---------------------------------------------
14175481691 100% nocache
70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns % Task name
---------------------------------------------
16300449457 100% cached channel
61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns % Task name
---------------------------------------------
18206111556 100% message conversion
54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns % Task name
---------------------------------------------
26654581638 100% Spring Integration
37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns % Task name
---------------------------------------------
102734493141 100% Stream Bridge
9k/s