Spring Cloud Stream content-based 负载路由
Spring Cloud Stream content-based routing on payload
我们正在使用 Spring Cloud Stream v2.2 与 Kafka 和 Avro(原生 encoder/decoder)。我们正在尝试根据负载条件使用 content-based 路由。我知道根据 Spring Cloud Stream docs content-based 路由只能在 header 上实现,因为有效载荷在到达条件时没有经过类型转换过程。因此,除非条件基于字节格式,否则它不会按预期工作。但是,我知道当 Avro 在本机模式下使用时,消息 header 会被跳过,并且不会处理任何类型协商。所以我不确定 content-based 路由是否按预期在负载上工作。
@StreamListener(target = Channels.INPUT, condition =
"payload.context['type']=='one' or"
+ " payload.context['type']=='two'")
public void doStuff(TypeOneAndTwoData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}
@StreamListener(target = Channels.INPUT, condition =
"payload.context['type']=='three' or"
+ " payload.context['type']=='four'")
public void doOtherStuff(TypeThreeAndFourData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}
根据我已有的日志记录,我可以看到偶尔 doStuff
被触发,有时 doOtherStuff
被触发。但是,似乎大多数 none 都被触发了,消息被跳过了。根据输入数据,我确定 context.type
只能有“一”、“二”、“三”和“四”这 4 个值,因此根据输入,不可能期望有其他值,但我经常可以在日志中看到以下条目:
Cannot find a @StreamListener matching for message with id: null
我有几个问题:
- 消息被反序列化为原生 Avro 格式的相应 POJO class 后,负载条件是否适用于消息?
- 为什么条件有时有效有时无效?
id: null
是什么意思吗?
- content-based 路由如何从线程的角度工作?当我们有两个具有不同条件的 StreamListner 或它们工作 single-threaded 时,是否执行多个线程 运行? at-least-once 如何保证在这种情况下管理消息传递?条件应该互斥吗?
是;它(当前)受支持。
id:null
不幸;对于 Kafka,默认情况下 message.headers['id']
为 null - 它没有任何意义,因此此日志消息没有多大用处。
问题是 none 的条件与转换后的负载匹配
你可以在DispatchingStreamListenerMessageHandler.handleRequestMessage()
中设置一个断点来找出问题所在。
编辑
我没有回答你的第三个问题。
来电是single-threaded;不,一条消息可以匹配多个条件;只有在没有条件匹配时,您才会获得该日志。参见我上面引用的方法。
如果多个条件匹配,侦听器抛出的任何异常将停止处理(并调用 retry/DLQ 处理等)。
我们正在使用 Spring Cloud Stream v2.2 与 Kafka 和 Avro(原生 encoder/decoder)。我们正在尝试根据负载条件使用 content-based 路由。我知道根据 Spring Cloud Stream docs content-based 路由只能在 header 上实现,因为有效载荷在到达条件时没有经过类型转换过程。因此,除非条件基于字节格式,否则它不会按预期工作。但是,我知道当 Avro 在本机模式下使用时,消息 header 会被跳过,并且不会处理任何类型协商。所以我不确定 content-based 路由是否按预期在负载上工作。
@StreamListener(target = Channels.INPUT, condition =
"payload.context['type']=='one' or"
+ " payload.context['type']=='two'")
public void doStuff(TypeOneAndTwoData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}
@StreamListener(target = Channels.INPUT, condition =
"payload.context['type']=='three' or"
+ " payload.context['type']=='four'")
public void doOtherStuff(TypeThreeAndFourData inputData){
...
channels.outputChannel().send(MessageBuilder.withPayload(inputData).build());
}
根据我已有的日志记录,我可以看到偶尔 doStuff
被触发,有时 doOtherStuff
被触发。但是,似乎大多数 none 都被触发了,消息被跳过了。根据输入数据,我确定 context.type
只能有“一”、“二”、“三”和“四”这 4 个值,因此根据输入,不可能期望有其他值,但我经常可以在日志中看到以下条目:
Cannot find a @StreamListener matching for message with id: null
我有几个问题:
- 消息被反序列化为原生 Avro 格式的相应 POJO class 后,负载条件是否适用于消息?
- 为什么条件有时有效有时无效?
id: null
是什么意思吗? - content-based 路由如何从线程的角度工作?当我们有两个具有不同条件的 StreamListner 或它们工作 single-threaded 时,是否执行多个线程 运行? at-least-once 如何保证在这种情况下管理消息传递?条件应该互斥吗?
是;它(当前)受支持。
id:
null
不幸;对于 Kafka,默认情况下message.headers['id']
为 null - 它没有任何意义,因此此日志消息没有多大用处。问题是 none 的条件与转换后的负载匹配
你可以在DispatchingStreamListenerMessageHandler.handleRequestMessage()
中设置一个断点来找出问题所在。
编辑
我没有回答你的第三个问题。
来电是single-threaded;不,一条消息可以匹配多个条件;只有在没有条件匹配时,您才会获得该日志。参见我上面引用的方法。
如果多个条件匹配,侦听器抛出的任何异常将停止处理(并调用 retry/DLQ 处理等)。