AWS Kinesis NextShardIterator 永不为空
AWS Kinesis NextShardIterator Never Getting Null
上下文: 我正在尝试使用 API 引用从 Kinesis 流中获取记录。我正在使用 .Net Core(3.1 版本)。
我正在使用 API 将数据写入 Kinesis Stream。这个问题没有任何问题。但是我在读取数据时遇到了一些问题。我将 getRecord 方法放在一个 do-while 循环中。 while 的条件是 nextShardIterator 的值是否为 null?但是这个值永远不会为空。我无法打破循环。
一些答案包含以下短语:
"NextShardIterator 分片中的下一个位置,从该位置开始顺序读取数据记录。如果设置为 null,则分片已关闭并且请求的迭代器不再 return 任何数据。"
我只有 1 个流和 1 个分片。我放了2条记录。之后,我执行读取方法。它正在获取这些记录。但在那之后,即使记录被消耗,nextShardIterator 也永远不会变为 null。
- 列出流(我只有 1 个流)
- 描述流并获取分片(我只有1个分片)
- 获取 TRIM_HORIZON 类型的 ShardIterator
- 循环中的 GetRecord (while{if nextShardIterator != null})
- 找到记录后,我正在尝试使用 AFTER_SEQUENCE 分片迭代器
- 如果无法获取任何记录,我正在尝试使用最新的分片迭代器
putRecord代码是这样的:
public async Task<ResponseModel> PutRecord(string orderFlow, string documentId)
{
byte[] bytedata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderFlow));
using MemoryStream memoryStream = new MemoryStream(bytedata);
var putRecordRequest = new PutRecordRequest();
putRecordRequest.StreamName = myStreamName;
putRecordRequest.PartitionKey = "partition" + documentId;
putRecordRequest.Data = memoryStream;
try
{
var putRecordResponse = await kinesisClient.PutRecordAsync(putRecordRequest);
return new ResponseModel
{
Data = putRecordResponse,
Status = ResponseStatus.Success,
Message = "Successfully put record!"
};
}
catch (Exception e)
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Error,
Message = "PutRecord Error: " + e.Message
};
}
}
但是我在读取数据时遇到了一些问题。 GetRecord 代码是这样的:
public async Task<ResponseModel> GetRecords()
{
var listStreams = await ListStreams();
if (listStreams.StreamNames.Count == 0)
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Warning,
Message = "Do not have any Stream!"
};
}
myStreamName = listStreams.StreamNames[0];
var describeStreams = await DescribeStream(listStreams.StreamNames[0]);
if (describeStreams.StreamDescription.StreamStatus != "ACTIVE")
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Warning,
Message = "Stream status: " + describeStreams.StreamDescription.StreamStatus
};
}
var shards = describeStreams.StreamDescription.Shards;
if (shards.Count == 0)
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Warning,
Message = "Do not have any Shard (or data)!"
};
}
var shardId = shards[0].ShardId;
var shardIterator = await GetShardIterator(shardId);
if (string.IsNullOrWhiteSpace(shardIterator.ShardIterator))
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Warning,
Message = "ShardIterator is null or empty!"
};
}
var getRecords = await GetRecords(shardIterator.ShardIterator);
Console.WriteLine("First Iterator: " + shardIterator);
var dataList = new List<Record>();
do
{
if (getRecords.Records.Count == 0)
{
Console.WriteLine("Records are empty!");
var nextShardIterator = GetShardLatest(shardId).Result.ShardIterator;
getRecords = await GetRecords(nextShardIterator);
Console.WriteLine("Latest Iterator: " + nextShardIterator);
}
else
{
Console.WriteLine("We have records!");
foreach (var record in getRecords.Records)
{
dataList.Add(record);
}
var nextShardIterator = GetShardIteratorWithSequence(shardId, getRecords.Records[getRecords.Records.Count-1].SequenceNumber).Result.ShardIterator;
Console.WriteLine("AfterSequence Iterator: " + nextShardIterator);
getRecords = await GetRecords(nextShardIterator);
}
} while (getRecords.NextShardIterator != null);
return new ResponseModel
{
Data = dataList,
Status = ResponseStatus.Success,
Message = "Successfull"
};
}
Kinesis 流是一个潜在的无限记录序列,可以由多个生产者随时添加。因此,打开流的分片迭代器永远不会为空。
如果您想在到达流的“结尾”时跳出循环,请查看 GetRecords 响应中的 MillisBehindLatest
字段。引用文档:
A value of zero indicates that record processing is caught up, and there are no new records to process at this moment.
但是请注意,新记录可能随时添加。如果您确实跳出了循环,请务必保存从您处理的最后一条记录中返回的 SequenceNumber
,以便您可以从中断处继续。
上下文: 我正在尝试使用 API 引用从 Kinesis 流中获取记录。我正在使用 .Net Core(3.1 版本)。
我正在使用 API 将数据写入 Kinesis Stream。这个问题没有任何问题。但是我在读取数据时遇到了一些问题。我将 getRecord 方法放在一个 do-while 循环中。 while 的条件是 nextShardIterator 的值是否为 null?但是这个值永远不会为空。我无法打破循环。
一些答案包含以下短语: "NextShardIterator 分片中的下一个位置,从该位置开始顺序读取数据记录。如果设置为 null,则分片已关闭并且请求的迭代器不再 return 任何数据。"
我只有 1 个流和 1 个分片。我放了2条记录。之后,我执行读取方法。它正在获取这些记录。但在那之后,即使记录被消耗,nextShardIterator 也永远不会变为 null。
- 列出流(我只有 1 个流)
- 描述流并获取分片(我只有1个分片)
- 获取 TRIM_HORIZON 类型的 ShardIterator
- 循环中的 GetRecord (while{if nextShardIterator != null})
- 找到记录后,我正在尝试使用 AFTER_SEQUENCE 分片迭代器
- 如果无法获取任何记录,我正在尝试使用最新的分片迭代器
putRecord代码是这样的:
public async Task<ResponseModel> PutRecord(string orderFlow, string documentId)
{
byte[] bytedata = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(orderFlow));
using MemoryStream memoryStream = new MemoryStream(bytedata);
var putRecordRequest = new PutRecordRequest();
putRecordRequest.StreamName = myStreamName;
putRecordRequest.PartitionKey = "partition" + documentId;
putRecordRequest.Data = memoryStream;
try
{
var putRecordResponse = await kinesisClient.PutRecordAsync(putRecordRequest);
return new ResponseModel
{
Data = putRecordResponse,
Status = ResponseStatus.Success,
Message = "Successfully put record!"
};
}
catch (Exception e)
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Error,
Message = "PutRecord Error: " + e.Message
};
}
}
但是我在读取数据时遇到了一些问题。 GetRecord 代码是这样的:
public async Task<ResponseModel> GetRecords()
{
var listStreams = await ListStreams();
if (listStreams.StreamNames.Count == 0)
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Warning,
Message = "Do not have any Stream!"
};
}
myStreamName = listStreams.StreamNames[0];
var describeStreams = await DescribeStream(listStreams.StreamNames[0]);
if (describeStreams.StreamDescription.StreamStatus != "ACTIVE")
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Warning,
Message = "Stream status: " + describeStreams.StreamDescription.StreamStatus
};
}
var shards = describeStreams.StreamDescription.Shards;
if (shards.Count == 0)
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Warning,
Message = "Do not have any Shard (or data)!"
};
}
var shardId = shards[0].ShardId;
var shardIterator = await GetShardIterator(shardId);
if (string.IsNullOrWhiteSpace(shardIterator.ShardIterator))
{
return new ResponseModel
{
Data = null,
Status = ResponseStatus.Warning,
Message = "ShardIterator is null or empty!"
};
}
var getRecords = await GetRecords(shardIterator.ShardIterator);
Console.WriteLine("First Iterator: " + shardIterator);
var dataList = new List<Record>();
do
{
if (getRecords.Records.Count == 0)
{
Console.WriteLine("Records are empty!");
var nextShardIterator = GetShardLatest(shardId).Result.ShardIterator;
getRecords = await GetRecords(nextShardIterator);
Console.WriteLine("Latest Iterator: " + nextShardIterator);
}
else
{
Console.WriteLine("We have records!");
foreach (var record in getRecords.Records)
{
dataList.Add(record);
}
var nextShardIterator = GetShardIteratorWithSequence(shardId, getRecords.Records[getRecords.Records.Count-1].SequenceNumber).Result.ShardIterator;
Console.WriteLine("AfterSequence Iterator: " + nextShardIterator);
getRecords = await GetRecords(nextShardIterator);
}
} while (getRecords.NextShardIterator != null);
return new ResponseModel
{
Data = dataList,
Status = ResponseStatus.Success,
Message = "Successfull"
};
}
Kinesis 流是一个潜在的无限记录序列,可以由多个生产者随时添加。因此,打开流的分片迭代器永远不会为空。
如果您想在到达流的“结尾”时跳出循环,请查看 GetRecords 响应中的 MillisBehindLatest
字段。引用文档:
A value of zero indicates that record processing is caught up, and there are no new records to process at this moment.
但是请注意,新记录可能随时添加。如果您确实跳出了循环,请务必保存从您处理的最后一条记录中返回的 SequenceNumber
,以便您可以从中断处继续。