Spring 在批处理模式下使用消息时出现 Aws Kinesis Binder ProvisionedThroughputExceededException

Spring Aws Kinesis Binder ProvisionedThroughputExceededException while consuming messages in Batch Mode

我正在使用批处理模式从运动流中提取记录。我们正在使用 spring aws kinesis 活页夹。

大多数时候我们无法从流中提取消息。只有某些时候我们能够从流中提取消息。

我的配置如下所示

我的配置

spring:
  cloud:
    stream:
      kinesis:
        binder:
          locks:
            leaseDuration: 30
            readCapacity: 1
            writeCapacity: 1
          checkpoint:
            readCapacity: 1
            writeCapacity: 1
        bindings:
          InStreamGroupOne:
            consumer:
              listenerMode: batch
              idleBetweenPolls: 30000
              recordsLimit: 5000
              consumer-backoff: 1000
      bindings:
        InStreamGroupOne:
          group: in-stream-group
          destination: stream-1
          content-type: application/json
        OutboundStreamOne:
          destination: stream-2
          content-type: application/json
        OutboundStreamTwo:
          destination: stream-3
          content-type: application/json
        OutboundStreamThree:
          destination: stream-4
          content-type: application/json

当我启用调试日志时,我可以看到这个异常

Received error response: com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputExceededException: The level of configured provisioned throughput for the table was exceeded. Consider increasing your provisioning level with the UpdateTable API. (Service: AmazonDynamoDBv2; Status Code: 400; Error Code: ProvisionedThroughputExceededException; 

我尝试将批量大小减少为 150,将 idleBetweenPools 减少到 1 秒。我还将 readCapacity 和 writeCapacity 更新为 10。但同样的错误。

从 AWS 控制台,我可以看到 SpringIntegrationLockRegistry 已超过读取阈值。

能否请您帮助我们了解问题所在。

有时有效,有时无效。

关于 AWS 上的 DynamoDB,您可以执行以下操作:

从应用程序的角度来看,您可以使用锁的选项:https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/blob/master/spring-cloud-stream-binder-kinesis-docs/src/main/asciidoc/overview.adoc#lockregistry

leaseDuration

The length of time that the lease for the lock will be granted for. If this is set to, for example, 30 seconds, then the lock will expire if the heartbeat is not sent for at least 30 seconds (which would happen if the box or the heartbeat thread dies, for example.)

Default: 20

heartbeatPeriod

How often to update DynamoDB to note that the instance is still running (recommendation is to make this at least 3 times smaller than the leaseDuration - for example heartBeatPeriod=1 second, leaseDuration=10 seconds could be a reasonable configuration, make sure to include a buffer for network latency.)

Default: 5

refreshPeriod

How long to wait before trying to get the lock again (if set to 10 seconds, for example, it would attempt to do so every 10 seconds)

Default: 1000