运动流上没有记录暂停消息 1 秒
No records for on kinesis stream suspending messages for 1 sec
我正在使用 spring kinesis 活页夹以批处理模式使用消息
有时我无法使用流中的消息。这并非一直发生
[ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER,sequenceNumber='49594358705006691330463332232285735104253344290967650306',timestamp=null,stream='mystream-1',shard=[ 26=],reset=false},state=CONSUME}] sequenceNumber [null]。暂停消费 [1000] 毫秒。
我不断收到此消息,但消息未被使用。
我的 Conf 如下所示
spring:
cloud:
stream:
bindings:
input:
group: mygroup
destination: mystream-1
content-type: application/json
output:
destination: mystream-2
content-type: application/json
kinesis:
bindings:
input:
consumer:
listenerMode: batch
idleBetweenPolls: 60000
consumer-backoff: 1000
binder:
headers: x-item_id,x-message_type
locks:
table: lTLocks
leaseDuration: 30
refreshPeriod: 3000
checkpoint:
table: lTCheckPoints
流中有消息,但我没有断断续续地消费。你能帮忙吗
我认为您的检查点存储和 Kinesis 流中的分片包含不同的序列号。
请考虑在开始使用流之前使用清理 lTCheckPoints
table。
我并不是说您需要一直这样做,但至少为了干净的测试环境。
我正在使用 spring kinesis 活页夹以批处理模式使用消息
有时我无法使用流中的消息。这并非一直发生
[ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER,sequenceNumber='49594358705006691330463332232285735104253344290967650306',timestamp=null,stream='mystream-1',shard=[ 26=],reset=false},state=CONSUME}] sequenceNumber [null]。暂停消费 [1000] 毫秒。
我不断收到此消息,但消息未被使用。
我的 Conf 如下所示
spring:
cloud:
stream:
bindings:
input:
group: mygroup
destination: mystream-1
content-type: application/json
output:
destination: mystream-2
content-type: application/json
kinesis:
bindings:
input:
consumer:
listenerMode: batch
idleBetweenPolls: 60000
consumer-backoff: 1000
binder:
headers: x-item_id,x-message_type
locks:
table: lTLocks
leaseDuration: 30
refreshPeriod: 3000
checkpoint:
table: lTCheckPoints
流中有消息,但我没有断断续续地消费。你能帮忙吗
我认为您的检查点存储和 Kinesis 流中的分片包含不同的序列号。
请考虑在开始使用流之前使用清理 lTCheckPoints
table。
我并不是说您需要一直这样做,但至少为了干净的测试环境。