为什么在使用 Lambda 处理程序时 "GetRecords.IteratorAgeMilliseconds" 指标中未显示 Kinesis 记录处理延迟
Why isn't the Kinesis record processing delay shown in the "GetRecords.IteratorAgeMilliseconds" metric when using Lambda handler
我正在尝试使用 Kinesis 和 Lambda。
我看不到 Kinesis "GetRecords.IteratorAge" 指标的延迟,即使它明显延迟。
实验环境如下
- Kinesis 数据流:1 个流由 1 个分片组成,没有增强的扇出。
- 生产者:它在本地 PC 上运行以下
producer.rb
。它每秒放一条记录。
- 消费者:以下
lambda_handler.rb
在 Lambda 中执行。它只是将带有时间戳的记录放入 DynamoDB table,并在每条记录上休眠 3 秒。
- 触发设置:
- 批量大小:50
- 批次window:None
- 每个分片的并发批次:1
- 上次处理结果:无记录处理
- 记录的最大年龄:604800
- 重试次数:10000
- 错误拆分批次:否
producer.rb
require 'aws-sdk'
kinesis = Aws::Kinesis::Client.new(region: 'ap-northeast-1')
COUNT = 300
STREAM_NAME = 'test_stream'
PKEY = 'client-001'
COUNT.times do |i|
kinesis.put_record(
stream_name: STREAM_NAME,
data: (i+1).to_s,
partition_key: PKEY
)
sleep 1
end
lambda_handler.rb
require 'json'
require 'aws-sdk'
require 'base64'
def lambda_handler(event:, context:)
dynamoDB = Aws::DynamoDB::Resource.new(region: 'ap-northeast-1')
table = dynamoDB.table(ENV['DYNAMODB_TABLE'])
item = {
'aws_request_id' => context.aws_request_id,
'start' => Time.now.to_s
}
event['Records'].each do { sleep 3 }
item['end'] = Time.now.to_s
table.put_item({item: item})
{ statusCode: 200 }
end
DynamoDB 中的结果如下所示,Cloudwatch 中的指标如下所示:
它处理了 04:09:03 和 04:24:04 之间的记录。
为什么即使记录处理没有进展,"GetRecords.IteratorAge"也不会增加?
这个问题是自我解决的。
本视频详细讲解了Lambda流源处理的内部结构。
"Poller" 订阅分片并通过 GetRecords 从分片迭代器中获取记录,然后 "Poller" 调用前端函数并传递其记录。
因此,即使 Lambda 函数延迟,GetRecords 也没有延迟。
我正在尝试使用 Kinesis 和 Lambda。
我看不到 Kinesis "GetRecords.IteratorAge" 指标的延迟,即使它明显延迟。
实验环境如下
- Kinesis 数据流:1 个流由 1 个分片组成,没有增强的扇出。
- 生产者:它在本地 PC 上运行以下
producer.rb
。它每秒放一条记录。 - 消费者:以下
lambda_handler.rb
在 Lambda 中执行。它只是将带有时间戳的记录放入 DynamoDB table,并在每条记录上休眠 3 秒。 - 触发设置:
- 批量大小:50
- 批次window:None
- 每个分片的并发批次:1
- 上次处理结果:无记录处理
- 记录的最大年龄:604800
- 重试次数:10000
- 错误拆分批次:否
producer.rb
require 'aws-sdk'
kinesis = Aws::Kinesis::Client.new(region: 'ap-northeast-1')
COUNT = 300
STREAM_NAME = 'test_stream'
PKEY = 'client-001'
COUNT.times do |i|
kinesis.put_record(
stream_name: STREAM_NAME,
data: (i+1).to_s,
partition_key: PKEY
)
sleep 1
end
lambda_handler.rb
require 'json'
require 'aws-sdk'
require 'base64'
def lambda_handler(event:, context:)
dynamoDB = Aws::DynamoDB::Resource.new(region: 'ap-northeast-1')
table = dynamoDB.table(ENV['DYNAMODB_TABLE'])
item = {
'aws_request_id' => context.aws_request_id,
'start' => Time.now.to_s
}
event['Records'].each do { sleep 3 }
item['end'] = Time.now.to_s
table.put_item({item: item})
{ statusCode: 200 }
end
DynamoDB 中的结果如下所示,Cloudwatch 中的指标如下所示:
它处理了 04:09:03 和 04:24:04 之间的记录。 为什么即使记录处理没有进展,"GetRecords.IteratorAge"也不会增加?
这个问题是自我解决的。
本视频详细讲解了Lambda流源处理的内部结构。
"Poller" 订阅分片并通过 GetRecords 从分片迭代器中获取记录,然后 "Poller" 调用前端函数并传递其记录。 因此,即使 Lambda 函数延迟,GetRecords 也没有延迟。