Spring Cloud Stream content-based 负载路由

Spring Cloud Stream content-based routing on payload

我们正在使用 Spring Cloud Stream v2.2 与 Kafka 和 Avro(原生 encoder/decoder)。我们正在尝试根据负载条件使用 content-based 路由。我知道根据 Spring Cloud Stream docs content-based 路由只能在 header 上实现,因为有效载荷在到达条件时没有经过类型转换过程。因此,除非条件基于字节格式,否则它不会按预期工作。但是,我知道当 Avro 在本机模式下使用时,消息 header 会被跳过,并且不会处理任何类型协商。所以我不确定 content-based 路由是否按预期在负载上工作。

@StreamListener(target = Channels.INPUT, condition =
      "payload.context['type']=='one' or"
          + " payload.context['type']=='two'")
  public void doStuff(TypeOneAndTwoData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}


@StreamListener(target = Channels.INPUT, condition =
      "payload.context['type']=='three' or"
          + " payload.context['type']=='four'")
  public void doOtherStuff(TypeThreeAndFourData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}

根据我已有的日志记录,我可以看到偶尔 doStuff 被触发,有时 doOtherStuff 被触发。但是,似乎大多数 none 都被触发了,消息被跳过了。根据输入数据,我确定 context.type 只能有“一”、“二”、“三”和“四”这 4 个值,因此根据输入,不可能期望有其他值,但我经常可以在日志中看到以下条目:

Cannot find a @StreamListener matching for message with id: null

我有几个问题:

  • 是;它(当前)受支持。

  • id:null不幸;对于 Kafka,默认情况下 message.headers['id'] 为 null - 它没有任何意义,因此此日志消息没有多大用处。

  • 问题是 none 的条件与转换后的负载匹配

你可以在DispatchingStreamListenerMessageHandler.handleRequestMessage()中设置一个断点来找出问题所在。

编辑

我没有回答你的第三个问题。

来电是single-threaded;不,一条消息可以匹配多个条件;只有在没有条件匹配时,您才会获得该日志。参见我上面引用的方法。

如果多个条件匹配,侦听器抛出的任何异常将停止处理(并调用 retry/DLQ 处理等)。