Kinesis 消费者返回空记录(boto,python)
Kinesis consumer returning empty record (boto, python)
我在检查写入 Kinesis 的数据时遇到问题。看起来下面的示例应该有效,但我从 get_records (在 Records 字段中)返回一个空列表。知道会发生什么吗?
import uuid
import boto3
import time
streamname = 'mytestStream'
session = boto3.session.Session()
kinesis_client = session.client('kinesis', region_name='us-east-1')
##### WRITE TO KINESIS
partitionkey = str(uuid.uuid4())[:8]
put_response = kinesis_client.put_record(StreamName=streamname,Data='mytestdata',PartitionKey=partitionkey)
time.sleep(5)
##### READ FROM KINESIS
shard_id = kinesis_client.describe_stream(StreamName=streamname)['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=streamname, ShardId=shard_id, ShardIteratorType="LATEST")["ShardIterator"]
data_from_kinesis = kinesis_client.get_records(ShardIterator=shard_iterator)
谢谢!
如果您要使用最新的检查点,您应该首先开始读取流,然后放置记录。在您的示例中,时间线如下;
- 在 t0:流中的最新检查点在 101。
- 在 t1(主线程):你把记录放到流中,记录在检查点 102。
- 在 t2(主线程):您在最新点 103 开始跟踪流。
要解决此问题,您应该 运行 生产者和消费者在不同的线程中。正确的流程应该是这样的;
- 在 t0(消费者线程):在最新位置开始拖尾 steam,即 201。
- at t1 (producer thread): 你把record放到stream中,record放在checkpoint 202.
- 在 t2(消费者线程):随着服务器端的分片向前移动(因为您刚刚添加了数据)并且您从检查点 201 开始一直跟踪分片,您迭代新的检查点 202 并显示您的数据。
我在检查写入 Kinesis 的数据时遇到问题。看起来下面的示例应该有效,但我从 get_records (在 Records 字段中)返回一个空列表。知道会发生什么吗?
import uuid
import boto3
import time
streamname = 'mytestStream'
session = boto3.session.Session()
kinesis_client = session.client('kinesis', region_name='us-east-1')
##### WRITE TO KINESIS
partitionkey = str(uuid.uuid4())[:8]
put_response = kinesis_client.put_record(StreamName=streamname,Data='mytestdata',PartitionKey=partitionkey)
time.sleep(5)
##### READ FROM KINESIS
shard_id = kinesis_client.describe_stream(StreamName=streamname)['StreamDescription']['Shards'][0]['ShardId']
shard_iterator = kinesis_client.get_shard_iterator(StreamName=streamname, ShardId=shard_id, ShardIteratorType="LATEST")["ShardIterator"]
data_from_kinesis = kinesis_client.get_records(ShardIterator=shard_iterator)
谢谢!
如果您要使用最新的检查点,您应该首先开始读取流,然后放置记录。在您的示例中,时间线如下;
- 在 t0:流中的最新检查点在 101。
- 在 t1(主线程):你把记录放到流中,记录在检查点 102。
- 在 t2(主线程):您在最新点 103 开始跟踪流。
要解决此问题,您应该 运行 生产者和消费者在不同的线程中。正确的流程应该是这样的;
- 在 t0(消费者线程):在最新位置开始拖尾 steam,即 201。
- at t1 (producer thread): 你把record放到stream中,record放在checkpoint 202.
- 在 t2(消费者线程):随着服务器端的分片向前移动(因为您刚刚添加了数据)并且您从检查点 201 开始一直跟踪分片,您迭代新的检查点 202 并显示您的数据。