Kinesis 分片读取限制为 2 mib,那么获取记录调用怎么可能达到 10mib

Kinesis shard read limit is 2 mib so how is it possible that a get record call can get upto 10mib

我指的是 document 上面写着 "Each shard can support up to a maximum total data read rate of 2 MiB per second via GetRecords. If a call to GetRecords returns 10 MiB, subsequent calls made within the next 5 seconds throw an exception." 我试图了解 getRecords 调用怎么可能获得(10Mib)超过 2mib 的分片限制?达到 2 mib 限制后,碎片 stop/throw 不会出错吗?

提前致谢

这句话看起来自相矛盾。他们应该改写它。

您应该考虑文档中的前两个陈述以了解上下文。

上述文件的摘录,

GetRecords can retrieve up to 10 MiB of data per call from a single shard, and up to 10,000 records per call. Each call to GetRecords is counted as one read transaction.

Each shard can support up to five read transactions per second. Each read transaction can provide up to 10,000 records with an upper limit of 10 MiB per transaction.

Each shard can support up to a maximum total data read rate of 2 MiB per second via GetRecords. If a call to GetRecords returns 10 MiB, subsequent calls made within the next 5 seconds throw an exception.

根据我使用 Kinesis 的经验,它们的实际意思是,每个分片对 GetRecords 调用的读取速率限制为每秒 2 MiB,并且此速率限制是在开始时超过一秒 window 计算得出的GetRecords 调用。

我不确定 Kinesis 的内部实现,但我知道 internals of Kafka。在 Kafka 中,分区(与 Kinesis 中的分片相同)被进一步划分为段,这些段基本上是日志文件。因此,每条消息都作为一个条目存储在日志文件中。

我怀疑他们已经通过以下方式实现了 GetRecords 服务器端 API,

python 伪代码:

current_timestamp = datetime.now
seconds_diff = (LAST_SUCCESSFUL_CALL.timestamp - current_timestamp).total_seconds()
if LAST_SUCCESSFUL_CALL.data_size > (seconds_diff * 2 Mib):
  LAST_SUCCESSFUL_CALL.data_size = LAST_SUCCESSFUL_CALL.data_size - (seconds_diff * 2 Mib)
  throw Error
else
  records = data_store.find_next_records_from_segments(10 MiB)
  # Here, implementation does not limit the records because sequential disk reading is always faster. 
  # So, It will be better to get as much records it has with some upper cap of 10 MiB or till the end of segment. 
  LAST_SUCCESSFUL_CALL.data_size = records.data_size
  LAST_SUCCESSFUL_CALL.timestamp = current_timestamp
  return records

通过将速率限制检查分散到之前的调用中,他们正在简化其实施。

它也最适合消费者可以快速追上记录的流处理应用程序。

例如,假设发生以下事件

T1 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 1 MiB
T2 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 2 MiB
T3 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 3 MiB
T4 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 4 MiB
T5 -> Ingest 1 MiB in shard, Consumer is busy on processing fetched data, Pending data = 5 MiB
T6 -> Ingest 1 MiB in shard, Consumer becomes idle and does GetRecords, gets 5 MiB data, Pending data = 1 MiB
T7 -> No new data ingestion, Consumer is busy on processing fetched data 
T8 -> No new data ingestion, Consumer is busy on processing fetched data 
T9 -> Consumer becomes Idle and does GetRecords, gets 1 MiB data. Pending data = 0 MiB 

因此,从 T7 到 T8,消费者使用 2 秒来完全处理 5 MiB 的数据,而不是分别为 2 MiB 的数据创建 GetRecords。这里我们正在保存网络调用和磁盘搜索。

总之,

Will not the shard stop/throw erro after it reached the 2 mib limit?

不,不会。但是在随后的几秒钟内进行的 GetRecords 将引发错误。但大多数情况下,您的消费者将花费随后的几秒钟来处理您在第一次 GetRecords 调用时收到的 10 MiB 数据,而不是查询新数据。所以,你不必太担心。