Spring Cloud Stream StreamBridge 性能低下?

Spring Cloud Stream StreamBridge low performance?

我正在使用 Spring Cloud StreamBridge 将消息发布到 RabbitMQ 交换器。使用本机 RabbitMQ PerfTest,我可以使用单个生产者轻松获得高达 100k msgs/s(1 个通道)。如果我启动一个带有发送 StreamBrige(也是 1 个通道)的 while 循环的线程,我只会得到 ~20k msgs/s 具有类似的设置(没有持久性,没有手动确认或确认,相同的 Docker 容器..).我正在使用 Spring Cloud Stream 和 Rabbit Binder 3.2.2.

我的 yml 如下所示:

spring:
  rabbitmq:
    host: localhost
    port: 5672

  cloud:
    function:
      definition: producer1;

    stream:
      bindings:
        producer1-out-0:
          destination: messageQueue
          #requiredGroups: consumerGroup1,
      rabbit:
        bindings:
          producer1-out-0:
            producer:
              deliveryMode: NON_PERSISTENT
              exchangeType: direct
              bindingRoutingKey: default_message
              routingKeyExpression: '''default_message'''
              #maxLength: 1
      output-bindings: producer1;

我的发送循环 RabbitMQ PerfTest-Tool 是用 Java 编写的,看起来很相似:

        @Autowired
        public StreamBridge streamBridge;

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        @PostConstruct
        public void launchProducer() {
            Runnable task = () -> {
                while (true){
                    streamBridge.send("producer1-out-0", "msg");
                }
            };
            executorService.submit(task);
        }

同样在我的控制台中,我在启动时收到一条奇怪的消息 Channel 'unknown.channel.name' has 1 subscriber(s),我不知道为什么。

使用 StreamBridge 的缓慢发送速率是自然的 Spring 限制还是我配置有误? 感谢您的帮助:)

在本机之上使用抽象时总会有一些开销 API;但是,5x 听起来不对。

i'm using -x 1 -y 1 -a as arguments, means only 1 producer is publishing messages with auto consumer-acks

这可能解释了它; auto ack 意味着没有 acks - 代理在消息发送给消费者时立即确认消息(冒着消息丢失的风险)。 Spring 中的等价物是 Acknowledgemode.NONE;默认情况下,容器会单独确认每条消息。

https://docs.spring.io/spring-amqp/docs/current/reference/html/#acknowledgeMode

https://docs.spring.io/spring-amqp/docs/current/reference/html/#batchSize

还有

https://docs.spring.io/spring-amqp/docs/current/reference/html/#prefetchCount

Spring AMQP 默认设置为 250,但是 SCSt 的默认值为 1,速度明显较慢。

编辑

有趣; SCSt 似乎确实比单独 Spring 集成增加了一些显着的开销。

下面测试来自原生 Java 客户端的各种场景,并在上面添加越来越多的 Spring 抽象,最后使用 StreamBridge;可能应该对其进行概要分析,以查看成本在哪里以及是否可以减轻成本。

spring.cloud.stream.bindings.output.destination=foo
spring.cloud.stream.rabbit.bindings.output.producer.exchange-type=direct

logging.level.root=warn
@SpringBootApplication
public class So71414000Application {

    public static void main(String[] args) {
        SpringApplication.run(So71414000Application.class, args).close();
    }

    @Bean
    ApplicationRunner runner1(CachingConnectionFactory cf) throws Exception {
        return args -> {
            /*
             * Native java API
             */
            Connection conn = cf.getRabbitConnectionFactory().newConnection("amqp://localhost:1562");
            Channel channel = conn.createChannel();
            byte[] msg = "msg".getBytes();
            AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(1).build();
            int count = 1000000;
            StopWatch watch = watch("native");
            IntStream.range(0, count).forEach(i -> {
                try {
                    channel.basicPublish("foo", "", props, msg);
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            });
            perf(count, watch);
            channel.close();
            conn.close();
        };
    }

    @Bean
    ApplicationRunner runner2(RabbitTemplate template) {
        return args -> {
            /*
             * Single ChannelProxy, no cache, no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("nocache");
            int count = 1000000;
            template.invoke(t -> {
                IntStream.range(0, count).forEach(i -> t.send("foo", "", msg));
                return null;
            });
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner3(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), no conversion
             */
            Message msg = MessageBuilder.withBody("msg".getBytes())
                    .andProperties(MessagePropertiesBuilder.newInstance()
                            .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build()).build();
            StopWatch watch = watch("cached channel");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.send("foo", "", msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner4(RabbitTemplate template) {
        return args -> {
            /*
             * ChannelProxy (cached), conversion
             */
            StopWatch watch = watch("message conversion");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> template.convertAndSend("foo", "", "msg"));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner5(RabbitTemplate template) {
        return args -> {
            /*
             * Spring Integration
             */
            AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(template);
            outbound.setExchangeName("foo");
            outbound.setRoutingKey("");
            DirectChannel channel = new DirectChannel();
            EventDrivenConsumer consumer = new EventDrivenConsumer(channel, outbound);
            consumer.start();
            GenericMessage<?> msg = new GenericMessage<>("foo".getBytes());
            StopWatch watch = watch("Spring Integration");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> channel.send(msg));
            perf(count, watch);
        };
    }

    @Bean
    ApplicationRunner runner6(StreamBridge bridge) {
        return args -> {
            /*
             * Stream bridge
             */
            StopWatch watch = watch("Stream Bridge");
            int count = 1000000;
            IntStream.range(0, count).forEach(i -> bridge.send("output", "msg"));
            perf(count, watch);
        };
    }


    private StopWatch watch(String name) {
        StopWatch watch = new StopWatch();
        watch.start(name);
        return watch;
    }

    private void perf(int count, StopWatch watch) {
        watch.stop();
        System.out.println(watch.prettyPrint());
        System.out.println((int) ((count) / (watch.getTotalTimeSeconds()) / 1000) + "k/s");
    }

}

在我的 MacBook Air (2018 1.6GHz I5) 和裸机代理上得到这些结果:


  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.6.4)

StopWatch '': running time = 10949129530 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
10949129530  100%  native

91k/s
StopWatch '': running time = 14175481691 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
14175481691  100%  nocache

70k/s
StopWatch '': running time = 16300449457 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
16300449457  100%  cached channel

61k/s
StopWatch '': running time = 18206111556 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
18206111556  100%  message conversion

54k/s
StopWatch '': running time = 26654581638 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
26654581638  100%  Spring Integration

37k/s
StopWatch '': running time = 102734493141 ns
---------------------------------------------
ns         %     Task name
---------------------------------------------
102734493141  100%  Stream Bridge

9k/s