Spring Aws Kinesis 消息未按顺序消费
Spring Aws Kinesis Messages are not consumed in order
我正在使用 1 个 shrad 推送 100 条消息。
spring:
cloud:
stream:
bindings:
myOutBound:
destination: my-stream
contentType: application/json
出于测试目的,我正在循环推送消息
@EnableBinding(MyBinder.class)
public class MyProcessor {
@Autowired
private MyBinder myBinder;
public void processRollup() {
List<MyObject> myObjects = IntStream.range(1, 100)
.mapToObj(Integer::valueOf)
.map(s-> new MyObject(s))
.collect(toList());
myObjects.forEach(messagePayload ->{
System.out.println(messagePayload.getId());
myBinder.myOutBound()
.send(MessageBuilder.withPayload(messagePayload)
.build());
}
);
}
}
我正在使用如下消息
spring:
cloud:
stream:
bindings:
RollUpInboundStream:
group: my-consumer-group
destination: my-stream
content-type: application/json
消息消费无序
我是不是漏掉了什么。
有几件事需要考虑。首先Binder中的producer默认是基于KinesisMessageHandler
和async
模式的:
messageHandler.setSync(producerProperties.getExtension().isSync());
因此,即使它寻找您以正确的顺序一条一条地发送这些消息,也不意味着它们以相同的顺序到达 AWS 上的流。
此外,即使您以同步模式发送它们,也无法保证它们以相同的顺序在 AWS 上结算。
看这里:Amazon Kinesis and guaranteed ordering
您还可以通过显式 sequenceNumber
在同一分片内实现顺序保证:
To guarantee strictly increasing ordering, write serially to a shard and use the SequenceNumberForOrdering parameter.
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
不幸的是,Kinesis Binder 目前不支持该选项,但我们可以通过在将消息发送到绑定器的 output
目的地之前在消息中显式设置 AwsHeaders.SEQUENCE_NUMBER
来克服它:
String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
}
我正在使用 1 个 shrad 推送 100 条消息。
spring:
cloud:
stream:
bindings:
myOutBound:
destination: my-stream
contentType: application/json
出于测试目的,我正在循环推送消息
@EnableBinding(MyBinder.class)
public class MyProcessor {
@Autowired
private MyBinder myBinder;
public void processRollup() {
List<MyObject> myObjects = IntStream.range(1, 100)
.mapToObj(Integer::valueOf)
.map(s-> new MyObject(s))
.collect(toList());
myObjects.forEach(messagePayload ->{
System.out.println(messagePayload.getId());
myBinder.myOutBound()
.send(MessageBuilder.withPayload(messagePayload)
.build());
}
);
}
}
我正在使用如下消息
spring:
cloud:
stream:
bindings:
RollUpInboundStream:
group: my-consumer-group
destination: my-stream
content-type: application/json
消息消费无序
我是不是漏掉了什么。
有几件事需要考虑。首先Binder中的producer默认是基于KinesisMessageHandler
和async
模式的:
messageHandler.setSync(producerProperties.getExtension().isSync());
因此,即使它寻找您以正确的顺序一条一条地发送这些消息,也不意味着它们以相同的顺序到达 AWS 上的流。
此外,即使您以同步模式发送它们,也无法保证它们以相同的顺序在 AWS 上结算。
看这里:Amazon Kinesis and guaranteed ordering
您还可以通过显式 sequenceNumber
在同一分片内实现顺序保证:
To guarantee strictly increasing ordering, write serially to a shard and use the SequenceNumberForOrdering parameter.
https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
不幸的是,Kinesis Binder 目前不支持该选项,但我们可以通过在将消息发送到绑定器的 output
目的地之前在消息中显式设置 AwsHeaders.SEQUENCE_NUMBER
来克服它:
String sequenceNumber = messageHeaders.get(AwsHeaders.SEQUENCE_NUMBER, String.class);
if (!StringUtils.hasText(sequenceNumber) && this.sequenceNumberExpression != null) {
sequenceNumber = this.sequenceNumberExpression.getValue(getEvaluationContext(), message, String.class);
}