Spring 云aws流,消息被消费者组中的多个实例消费
Spring cloud aws stream, messages are consumed by multiple instances in consumer group
我用 spring 云 aws 活页夹
编写了一个示例应用程序
compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')
代码
@StreamListener(Processor.INPUT)
public void receive(Message<String> message) {
System.out.println("Message recieved: "+message);
System.out.println("Message Payload: "+message.getPayload());
}
application.yml
spring:
cloud:
stream:
bindings:
input:
group: group
destination: stream
content-type: application/json
output:
group: group
destination: stream
content-type: application/json
我已经在多个端口上启动了应用程序
8081,8082,8083, 8084.
当我将消息发布到流中时,大多数时候不止一个实例在使用消息。
比如我发送了消息{"22":"11"},这个已经被8083和8084消费了
application:8084
上的消息
2018-03-16 12:29:19.715 INFO 10084 --- [ main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:19.809 INFO 10084 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8084 (http) with context path ''
2018-03-16 12:29:19.809 INFO 10084 --- [ main] com.example.aws.AwsApplication : Started AwsApplication in 21.034 seconds (JVM running for 22.975)
2018-03-16 12:29:19.840 INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:30:23.929 INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The records '[{SequenceNumber: 49582549849562056887358041088912873574803531055853731842,ApproximateArrivalTimestamp: Fri Mar 16 12:30:21 IST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=47 cap=47],PartitionKey: partitionKey-0,}]' are skipped from processing because their sequence numbers are less than already checkpointed: 49582549849562056887358041088912873574803531055853731842
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=f6cb4b6d-e149-059f-7e4d-aa9dfeeef10e, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183774995}]
Message Payload: {"22":"11"}
application:8083
上的消息
018-03-16 12:29:05.733 INFO 8188 --- [ main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:05.733 INFO 8188 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:29:05.796 INFO 8188 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8083 (http) with context path ''
2018-03-16 12:29:05.796 INFO 8188 --- [ main] com.example.aws.AwsApplication : Started AwsApplication in 19.463 seconds (JVM running for 20.956)
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=cf8647fe-8ce5-70b5-eeb9-74a08efc870a, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183775155}]
Message Payload: {"22":"11"}
理想情况下,组中只有一个消费者可以处理消息。我在这里遗漏了什么吗?
感谢您验证解决方案!
我想我找到了问题所在。它位于 ShardCheckpointer.checkpoint(String sequenceNumber)
.
现在的代码是这样的:
if (existingSequence == null ||
new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
this.checkpointStore.put(this.key, sequenceNumber);
return true;
}
当两个(所有?)节点检查状态并从存储中获取较小的值时,存在竞争条件。所以,我们正在传递条件,然后我们都转到 checkpointStore.put()
部分。而这里它们都存储了一个新的相同的值和 return true
让 Channel Adapter 处理相同的记录。
我的解决方法如下:
if (existingSequence == null ||
new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
if (existingSequence != null) {
return this.checkpointStore.replace(this.key, existingSequence, sequenceNumber);
}
else {
this.checkpointStore.put(this.key, sequenceNumber);
return true;
}
}
相同的条件,但我也使用 checkpointStore.replace()
这个合同:
/**
* Atomically replace the value for the key in the store if the old
* value matches the oldValue argument.
现在我尝试想出 test-case 来验证,并会在 BUILD-SNAPSHOT
准备好在您这边使用和验证时通知您。
我用 spring 云 aws 活页夹
编写了一个示例应用程序 compile('org.springframework.cloud:spring-cloud-starter-stream-kinesis:1.0.0.BUILD-SNAPSHOT')
代码
@StreamListener(Processor.INPUT)
public void receive(Message<String> message) {
System.out.println("Message recieved: "+message);
System.out.println("Message Payload: "+message.getPayload());
}
application.yml
spring:
cloud:
stream:
bindings:
input:
group: group
destination: stream
content-type: application/json
output:
group: group
destination: stream
content-type: application/json
我已经在多个端口上启动了应用程序
8081,8082,8083, 8084.
当我将消息发布到流中时,大多数时候不止一个实例在使用消息。
比如我发送了消息{"22":"11"},这个已经被8083和8084消费了
application:8084
上的消息2018-03-16 12:29:19.715 INFO 10084 --- [ main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:19.809 INFO 10084 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8084 (http) with context path ''
2018-03-16 12:29:19.809 INFO 10084 --- [ main] com.example.aws.AwsApplication : Started AwsApplication in 21.034 seconds (JVM running for 22.975)
2018-03-16 12:29:19.840 INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:30:23.929 INFO 10084 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The records '[{SequenceNumber: 49582549849562056887358041088912873574803531055853731842,ApproximateArrivalTimestamp: Fri Mar 16 12:30:21 IST 2018,Data: java.nio.HeapByteBuffer[pos=0 lim=47 cap=47],PartitionKey: partitionKey-0,}]' are skipped from processing because their sequence numbers are less than already checkpointed: 49582549849562056887358041088912873574803531055853731842
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=f6cb4b6d-e149-059f-7e4d-aa9dfeeef10e, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183774995}]
Message Payload: {"22":"11"}
application:8083
上的消息018-03-16 12:29:05.733 INFO 8188 --- [ main] a.i.k.KinesisMessageDrivenChannelAdapter : started KinesisMessageDrivenChannelAdapter{shardOffsets=[KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}], consumerGroup='group'}
2018-03-16 12:29:05.733 INFO 8188 --- [esis-consumer-1] a.i.k.KinesisMessageDrivenChannelAdapter : The [ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=TRIM_HORIZON, sequenceNumber='null', timestamp=null, stream='stream', shard='shardId-000000000000', reset=false}, state=NEW}] has been started.
2018-03-16 12:29:05.796 INFO 8188 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8083 (http) with context path ''
2018-03-16 12:29:05.796 INFO 8188 --- [ main] com.example.aws.AwsApplication : Started AwsApplication in 19.463 seconds (JVM running for 20.956)
Message recieved: GenericMessage [payload={"22":"11"}, headers={aws_shard=shardId-000000000000, id=cf8647fe-8ce5-70b5-eeb9-74a08efc870a, contentType=application/json, aws_receivedStream=stream, aws_receivedPartitionKey=partitionKey-0, aws_receivedSequenceNumber=49582549849562056887358041088914082500623155992949948418, timestamp=1521183775155}]
Message Payload: {"22":"11"}
理想情况下,组中只有一个消费者可以处理消息。我在这里遗漏了什么吗?
感谢您验证解决方案!
我想我找到了问题所在。它位于 ShardCheckpointer.checkpoint(String sequenceNumber)
.
现在的代码是这样的:
if (existingSequence == null ||
new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
this.checkpointStore.put(this.key, sequenceNumber);
return true;
}
当两个(所有?)节点检查状态并从存储中获取较小的值时,存在竞争条件。所以,我们正在传递条件,然后我们都转到 checkpointStore.put()
部分。而这里它们都存储了一个新的相同的值和 return true
让 Channel Adapter 处理相同的记录。
我的解决方法如下:
if (existingSequence == null ||
new BigInteger(existingSequence).compareTo(new BigInteger(sequenceNumber)) < 0) {
if (existingSequence != null) {
return this.checkpointStore.replace(this.key, existingSequence, sequenceNumber);
}
else {
this.checkpointStore.put(this.key, sequenceNumber);
return true;
}
}
相同的条件,但我也使用 checkpointStore.replace()
这个合同:
/**
* Atomically replace the value for the key in the store if the old
* value matches the oldValue argument.
现在我尝试想出 test-case 来验证,并会在 BUILD-SNAPSHOT
准备好在您这边使用和验证时通知您。