运动流上没有记录暂停消息 1 秒

No records for on kinesis stream suspending messages for 1 sec

我正在使用 spring kinesis 活页夹以批处理模式使用消息

有时我无法使用流中的消息。这并非一直发生

[ShardConsumer{shardOffset=KinesisShardOffset{iteratorType=AFTER_SEQUENCE_NUMBER,sequenceNumber='49594358705006691330463332232285735104253344290967650306',timestamp=null,stream='mystream-1',shard=[ 26=],reset=false},state=CONSUME}] sequenceNumber [null]。暂停消费 [1000] 毫秒。

我不断收到此消息,但消息未被使用。

我的 Conf 如下所示

 spring:
  cloud:
    stream:
      bindings:
        input:
          group: mygroup
          destination: mystream-1
          content-type: application/json
        output:
          destination: mystream-2
          content-type: application/json
      kinesis:        
        bindings:
          input:
            consumer:
              listenerMode: batch
              idleBetweenPolls: 60000
              consumer-backoff: 1000
        binder:
          headers: x-item_id,x-message_type
          locks:
            table: lTLocks
            leaseDuration: 30
            refreshPeriod: 3000
          checkpoint:
            table: lTCheckPoints

流中有消息,但我没有断断续续地消费。你能帮忙吗

我认为您的检查点存储和 Kinesis 流中的分片包含不同的序列号。

请考虑在开始使用流之前使用清理 lTCheckPoints table。

我并不是说您需要一直这样做,但至少为了干净的测试环境。