Spring 云流 - 聚合

Spring Cloud Stream - Aggregates

我正在尝试实施提议的 SCS 聚合,但我不确定是否理解它们的真正目的,因为结果让我感到惊讶。 首先,这是代码...

源,要安排的消息提供者:

@SpringBootApplication
@EnableBinding(Source.class)
public class SourceApplication {

    private final Logger logger = LoggerFactory.getLogger(SourceApplication.class);

    @Bean
    @InboundChannelAdapter(Source.OUTPUT)
    public MessageSource<String> createMessage() {
        return () -> {
            String payload = now().toString();
            logger.warn("Sent: " + payload);
            return new GenericMessage<>(payload);
        };
    }
}

然后是一个简单的处理器(变压器):

@SpringBootApplication
@EnableBinding(Processor.class)
public class ProcessorApplication {

    @Transformer(inputChannel = Processor.INPUT,
                 outputChannel = Processor.OUTPUT)
    public String processMessage(String payload) {
        return payload + " is the time.";
    }
}

这是最终消费者(接收器):

@SpringBootApplication
@EnableBinding(Sink.class)
public class SinkApplication {

    private final Logger logger = LoggerFactory.getLogger(SinkApplication.class);

    @ServiceActivator(inputChannel = Sink.INPUT)
    public void loggerSink(Object payload) {
        logger.warn("Received: " + payload);
    }
}

最后,聚合链接了这三个:

@SpringBootApplication
public class SampleAggregateApplication {

    public static void main(String[] args) {
        new AggregateApplicationBuilder().web(false)
            .from(SourceApplication.class)
                .args("--spring.cloud.stream.bindings.output.destination=step1", "--fixedDelay=5000")
            .via(ProcessorApplication.class)
                .args("--spring.cloud.stream.bindings.input.destination=step1",
                      "--spring.cloud.stream.bindings.output.destination=step2")
            .to(SinkApplication.class)
                .args("--spring.cloud.stream.bindings.input.destination=step2")
        .run(args);
    }
}

聚合启动时,这些是检索到的跟踪的摘录:

2017-02-03 09:59:13.428  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.428
2017-02-03 09:59:13.949  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.949  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:13.949
2017-02-03 09:59:13.996  WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:13.996
2017-02-03 09:59:14.430  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.430
2017-02-03 09:59:14.956  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.956
2017-02-03 09:59:14.956  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:14.956 is the time.
2017-02-03 09:59:14.999  WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:14.999
2017-02-03 09:59:15.432  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:15.432
2017-02-03 09:59:15.961  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:15.961
2017-02-03 09:59:15.961  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:15.961
2017-02-03 09:59:16.000  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16
2017-02-03 09:59:16.001  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:16
2017-02-03 09:59:16.437  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16.437
2017-02-03 09:59:16.966  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:16.966
2017-02-03 09:59:17.006  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.006  WARN 18688 --- [ask-scheduler-3] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:17.006
2017-02-03 09:59:17.443  WARN 18688 --- [ask-scheduler-9] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.443
2017-02-03 09:59:17.971  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:17.971
2017-02-03 09:59:17.971  WARN 18688 --- [ask-scheduler-2] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:17.971
2017-02-03 09:59:18.007  WARN 18688 --- [sk-scheduler-10] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.007
2017-02-03 09:59:18.448  WARN 18688 --- [ask-scheduler-5] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.448
2017-02-03 09:59:18.976  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:18.976
2017-02-03 09:59:18.976  WARN 18688 --- [ask-scheduler-6] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:18.976 is the time.
2017-02-03 09:59:19.012  WARN 18688 --- [ask-scheduler-7] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.012
2017-02-03 09:59:19.449  WARN 18688 --- [ask-scheduler-1] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.449
2017-02-03 09:59:19.982  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:19.982
2017-02-03 09:59:19.982  WARN 18688 --- [ask-scheduler-4] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:19.982
2017-02-03 09:59:20.018  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SourceApplication    : Sent: 2017-02-03T09:59:20.018
2017-02-03 09:59:20.018  WARN 18688 --- [ask-scheduler-8] c.p.demo.aggregator.SinkApplication      : Received: 2017-02-03T09:59:20.018
2017

我原以为每条消息都会遵循定义的周期:源-处理器-接收器。我们可以看到 3 条消息中至少有 2 条丢失,4 条消息中只有 1 条被转换。 注意:第二次尝试添加了通道目标,以避免应用程序之间的混淆(使用相同的 RabbitMQ 中间件)。

有人能告诉我我是否正确理解了聚合的目的并做了正确的事情来实现它?提前致谢。

几件事:

  • 您不应该为您的应用程序指定目的地,因为作为聚合的一部分的应用程序通过内部通道进行通信,而不是通过代理进行通信;
  • 其次(不幸的是,这是我们的文档没有指定的内容)- 在每个聚合组件定义上使用 @SpringBootApplication 时,聚合的不同部分必须属于不同的 Java 包.在您的情况下,您会在各种渠道上获得多个消费者,而不是您期望的链接。将 Source、Transformer 和 Sink 移动到单独的包中应该适合您。此外,添加了 https://github.com/spring-cloud/spring-cloud-stream/issues/785 来跟踪它。