Kinesis 消费者返回空记录(boto,python)

Kinesis consumer returning empty record (boto, python)

我在检查写入 Kinesis 的数据时遇到问题。看起来下面的示例应该有效,但我从 get_records (在 Records 字段中)返回一个空列表。知道会发生什么吗?

import uuid
import boto3
import time


streamname = 'mytestStream'
session = boto3.session.Session() 
kinesis_client = session.client('kinesis', region_name='us-east-1')


##### WRITE TO KINESIS

partitionkey = str(uuid.uuid4())[:8]
put_response = kinesis_client.put_record(StreamName=streamname,Data='mytestdata',PartitionKey=partitionkey)

time.sleep(5)


##### READ FROM KINESIS

shard_id = kinesis_client.describe_stream(StreamName=streamname)['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=streamname, ShardId=shard_id, ShardIteratorType="LATEST")["ShardIterator"]
data_from_kinesis = kinesis_client.get_records(ShardIterator=shard_iterator)

谢谢!

如果您要使用最新的检查点,您应该首先开始读取流,然后放置记录。在您的示例中,时间线如下;

  • 在 t0:流中的最新检查点在 101。
  • 在 t1(主线程):你把记录放到流中,记录在检查点 102。
  • 在 t2(主线程):您在最新点 103 开始​​跟踪流。

要解决此问题,您应该 运行 生产者和消费者在不同的线程中。正确的流程应该是这样的;

  • 在 t0(消费者线程):在最新位置开始拖尾 steam,即 201。
  • at t1 (producer thread): 你把record放到stream中,record放在checkpoint 202.
  • 在 t2(消费者线程):随着服务器端的分片向前移动(因为您刚刚添加了数据)并且您从检查点 201 开始一直跟踪分片,您迭代新的检查点 202 并显示您的数据。