当 运行 使用先前的序列号或时间戳时,从 Kinesis 读取给出空记录
Read from Kinesis is giving empty records when run using previous sequence number or timestamp
我正在尝试在
的帮助下阅读推送到 Kinesis 流的消息
get_records() and get_shard_iterator() APIs.
我的生产者在处理结束时不断推送记录,消费者也每 30 分钟保持 运行ning 作为 cron。因此,我尝试将当前读取的消息的序列号存储在我的数据库中,并使用 AFTER_SEQUENCE_NUMBER 分片迭代器和上次读取的序列号。但是,在新消息推送后,第二次(第一次成功读取流中的所有消息)将无法正常工作。
我还尝试使用 AT_TIMESTAMP 以及生产者作为消息的一部分推送到流中的消息时间戳,并存储该消息以供进一步使用。同样,第一个 运行 处理所有消息,第二个 运行 我得到空记录。
我真的不知道我哪里错了。如果有人可以帮助我,我将不胜感激。
使用时间戳提供下面的代码,但对序列号方法也做同样的事情。
def listen_to_kinesis_stream():
kinesis_client = boto3.client('kinesis', region_name=SETTINGS['region_name'])
stream_response = kinesis_client.describe_stream(StreamName=SETTINGS['kinesis_stream'])
for shard_info in stream_response['StreamDescription']['Shards']:
kinesis_stream_status = mongo_coll.find_one({'_id': "DOC_ID"})
last_read_ts = kinesis_stream_status.get('state', {}).get(
shard_info['ShardId'], datetime.datetime.strftime(datetime.date(1970, 01, 01), "%Y-%m-%dT%H:%M:%S.%f"))
shard_iterator = kinesis_client.get_shard_iterator(
StreamName=SETTINGS['kinesis_stream'],
ShardId=shard_info['ShardId'],
ShardIteratorType='AT_TIMESTAMP',
Timestamp=last_read_ts)
get_response = kinesis_client.get_records(ShardIterator=shard_iterator['ShardIterator'], Limit=1)
if len(get_response['Records']) == 0:
continue
message = json.loads(get_response['Records'][0]['Data'])
process_resp = process_message(message)
if process_resp['success'] is False:
print process_resp
generic_config_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
print "Processed {0}".format(message)
while 'NextShardIterator' in get_response:
get_response = kinesis_client.get_records(ShardIterator=get_response['NextShardIterator'], Limit=1)
if len(get_response['Records']) == 0:
break
message = json.loads(get_response['Records'][0]['Data'])
process_resp = process_message(message)
if process_resp['success'] is False:
print process_resp
mongo_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
print "Processed {0}".format(message)
logger.debug("Processed all messages from Kinesis stream")
print "Processed all messages from Kinesis stream"
根据我与 AWS 技术支持人员的讨论,可能会有几条消息的记录为空,因此在 len(get_response['Records']) 时中断不是一个好主意== 0.
建议的更好方法是 - 我们可以有一个计数器,指示您在 运行 中阅读的最大消息数,并在阅读尽可能多的消息后退出循环。
我正在尝试在
的帮助下阅读推送到 Kinesis 流的消息get_records() and get_shard_iterator() APIs.
我的生产者在处理结束时不断推送记录,消费者也每 30 分钟保持 运行ning 作为 cron。因此,我尝试将当前读取的消息的序列号存储在我的数据库中,并使用 AFTER_SEQUENCE_NUMBER 分片迭代器和上次读取的序列号。但是,在新消息推送后,第二次(第一次成功读取流中的所有消息)将无法正常工作。
我还尝试使用 AT_TIMESTAMP 以及生产者作为消息的一部分推送到流中的消息时间戳,并存储该消息以供进一步使用。同样,第一个 运行 处理所有消息,第二个 运行 我得到空记录。
我真的不知道我哪里错了。如果有人可以帮助我,我将不胜感激。
使用时间戳提供下面的代码,但对序列号方法也做同样的事情。
def listen_to_kinesis_stream():
kinesis_client = boto3.client('kinesis', region_name=SETTINGS['region_name'])
stream_response = kinesis_client.describe_stream(StreamName=SETTINGS['kinesis_stream'])
for shard_info in stream_response['StreamDescription']['Shards']:
kinesis_stream_status = mongo_coll.find_one({'_id': "DOC_ID"})
last_read_ts = kinesis_stream_status.get('state', {}).get(
shard_info['ShardId'], datetime.datetime.strftime(datetime.date(1970, 01, 01), "%Y-%m-%dT%H:%M:%S.%f"))
shard_iterator = kinesis_client.get_shard_iterator(
StreamName=SETTINGS['kinesis_stream'],
ShardId=shard_info['ShardId'],
ShardIteratorType='AT_TIMESTAMP',
Timestamp=last_read_ts)
get_response = kinesis_client.get_records(ShardIterator=shard_iterator['ShardIterator'], Limit=1)
if len(get_response['Records']) == 0:
continue
message = json.loads(get_response['Records'][0]['Data'])
process_resp = process_message(message)
if process_resp['success'] is False:
print process_resp
generic_config_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
print "Processed {0}".format(message)
while 'NextShardIterator' in get_response:
get_response = kinesis_client.get_records(ShardIterator=get_response['NextShardIterator'], Limit=1)
if len(get_response['Records']) == 0:
break
message = json.loads(get_response['Records'][0]['Data'])
process_resp = process_message(message)
if process_resp['success'] is False:
print process_resp
mongo_coll.update({'_id': "DOC_ID"}, {'$set': {'state.{0}'.format(shard_info['ShardId']): message['ts']}})
print "Processed {0}".format(message)
logger.debug("Processed all messages from Kinesis stream")
print "Processed all messages from Kinesis stream"
根据我与 AWS 技术支持人员的讨论,可能会有几条消息的记录为空,因此在 len(get_response['Records']) 时中断不是一个好主意== 0.
建议的更好方法是 - 我们可以有一个计数器,指示您在 运行 中阅读的最大消息数,并在阅读尽可能多的消息后退出循环。