Lambda(与 Kinesis Data Stream 连接)函数连续生成 cloudwatch 日志
Lambda (connected with Kinesis Data Stream) function generating cloudwatch logs continuously
在我当前的项目中,我的 objective 是从帧流中检测不同的对象。视频帧是使用与 Raspberry PI 连接的摄像头捕获的。
下面是一个粗略的架构。
架构设计如下:
video_cap.py
代码是 raspberry PI 上的 运行。此代码将图像流发送到 AWS 中的 Kinesis Data Stream(称为 FrameStream
)。
FrameStream
数据流接收流并触发到 lambda 函数(名为 lambda_function.py
)。 lambda 函数是使用 Python 3.7
编写的。
此 lambda 函数接收图像流,触发 AWS Rekognition 并发送电子邮件通知。
我的问题是即使我停止(按 Ctrl + C
)(video_cap.py
python 文件,运行 在 raspberry PI ),lambda 函数不断将日志(报告旧的接收帧)写入 CloudWatch。
请帮助我 - 我该如何解决这个问题?如果您需要任何其他信息,请告诉我。
video_cap.py
文件代码
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
# http://aws.amazon.com/asl/
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and limitations under the License.
import sys
import cPickle
import datetime
import cv2
import boto3
import time
import cPickle
from multiprocessing import Pool
import pytz
kinesis_client = boto3.client("kinesis")
rekog_client = boto3.client("rekognition")
camera_index = 0 # 0 is usually the built-in webcam
capture_rate = 30 # Frame capture rate.. every X frames. Positive integer.
rekog_max_labels = 123
rekog_min_conf = 50.0
#Send frame to Kinesis stream
def encode_and_send_frame(frame, frame_count, enable_kinesis=True, enable_rekog=False, write_file=False):
try:
#convert opencv Mat to jpg image
#print "----FRAME---"
retval, buff = cv2.imencode(".jpg", frame)
img_bytes = bytearray(buff)
utc_dt = pytz.utc.localize(datetime.datetime.now())
now_ts_utc = (utc_dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.utc)).total_seconds()
frame_package = {
'ApproximateCaptureTime' : now_ts_utc,
'FrameCount' : frame_count,
'ImageBytes' : img_bytes
}
if write_file:
print("Writing file img_{}.jpg".format(frame_count))
target = open("img_{}.jpg".format(frame_count), 'w')
target.write(img_bytes)
target.close()
#put encoded image in kinesis stream
if enable_kinesis:
print "Sending image to Kinesis"
response = kinesis_client.put_record(
StreamName="FrameStream",
Data=cPickle.dumps(frame_package),
PartitionKey="partitionkey"
)
print response
if enable_rekog:
response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=rekog_max_labels,
MinConfidence=rekog_min_conf
)
print response
except Exception as e:
print e
def main():
argv_len = len(sys.argv)
if argv_len > 1 and sys.argv[1].isdigit():
capture_rate = int(sys.argv[1])
cap = cv2.VideoCapture(0) #Use 0 for built-in camera. Use 1, 2, etc. for attached cameras.
pool = Pool(processes=3)
frame_count = 0
while True:
# Capture frame-by-frame
ret, frame = cap.read()
#cv2.resize(frame, (640, 360));
if ret is False:
break
if frame_count % capture_rate == 0:
result = pool.apply_async(encode_and_send_frame, (frame, frame_count, True, False, False,))
frame_count += 1
# Display the resulting frame
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()
return
if __name__ == '__main__':
main()
Lambda 函数(lambda_function.py
)
from __future__ import print_function
import base64
import json
import logging
import _pickle as cPickle
#import time
from datetime import datetime
import decimal
import uuid
import boto3
from copy import deepcopy
logger = logging.getLogger()
logger.setLevel(logging.INFO)
rekog_client = boto3.client('rekognition')
# S3 Configuration
s3_client = boto3.client('s3')
s3_bucket = "bucket-name-XXXXXXXXXXXXX"
s3_key_frames_root = "frames/"
# SNS Configuration
sns_client = boto3.client('sns')
label_watch_sns_topic_arn = "SNS-ARN-XXXXXXXXXXXXXXXX"
#Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
labels_on_watch_list = []
labels_on_watch_list_set = []
text_list_set = []
# List for detected text
text_list = []
def process_image(event, context):
# Start of for Loop
for record in event['Records']:
frame_package_b64 = record['kinesis']['data']
frame_package = cPickle.loads(base64.b64decode(frame_package_b64))
img_bytes = frame_package["ImageBytes"]
approx_capture_ts = frame_package["ApproximateCaptureTime"]
frame_count = frame_package["FrameCount"]
now_ts = datetime.now()
frame_id = str(uuid.uuid4())
approx_capture_timestamp = decimal.Decimal(approx_capture_ts)
year = now_ts.strftime("%Y")
mon = now_ts.strftime("%m")
day = now_ts.strftime("%d")
hour = now_ts.strftime("%H")
#=== Object Detection from an Image =====
# AWS Rekognition - Label detection from an image
rekog_response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=10,
MinConfidence= 90.0
)
logger.info("Rekognition Response" + str(rekog_response) )
for label in rekog_response['Labels']:
lbl = label['Name']
conf = label['Confidence']
labels_on_watch_list.append(deepcopy(lbl))
labels_on_watch_list_set = set(labels_on_watch_list)
#print(labels_on_watch_list)
logger.info("Labels on watch list ==>" + str(labels_on_watch_list_set) )
# Vehicle Detection
#if (lbl.upper() in (label.upper() for label in ["Transportation", "Vehicle", "Van" , "Ambulance" , "Bus"]) and conf >= 50.00):
#labels_on_watch_list.append(deepcopy(label))
#=== Detecting text from a detected Object
# Detect text from the detected vehicle using detect_text()
response=rekog_client.detect_text( Image={ 'Bytes': img_bytes })
textDetections=response['TextDetections']
for text in textDetections:
text_list.append(text['DetectedText'])
text_list_set = set(text_list)
logger.info("Text Detected ==>" + str(text_list_set))
# End of for Loop
# SNS Notification
if len(labels_on_watch_list_set) > 0 :
logger.info("I am in SNS Now......")
notification_txt = 'On {} Vehicle was spotted with {}% confidence'.format(now_ts.strftime('%x, %-I:%M %p %Z'), round(label['Confidence'], 2))
resp = sns_client.publish(TopicArn=label_watch_sns_topic_arn,
Message=json.dumps(
{
"message": notification_txt + " Detected Object Categories " + str(labels_on_watch_list_set) + " " + " Detect text on the Object " + " " + str(text_list_set)
}
))
#Store frame image in S3
s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, frame_id)
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=img_bytes
)
print ("Successfully processed records.")
return {
'statusCode': 200,
'body': json.dumps('Successfully processed records.')
}
def lambda_handler(event, context):
logger.info("Received event from Kinesis ......" )
logger.info("Received event ===>" + str(event))
return process_image(event, context)
Lambda 权限
以下是 Lambda 角色附加的 IAM 策略。
以下是 Kinesis Data Stream 日志(日期为 2019 年 8 月 17 日 - 1:54 PM IST[=64=])。上一次,2019 年 8 月 16 日通过 Raspberry PI 摄取的数据 - 6:45 PM)
看起来流中有大约 117K 条记录,但每次处理 1 条记录的速度很慢。 lambda 处理一条记录需要多长时间?我会得到你的 lambda 运行多长时间,更新 python 让代码休眠一段时间,让 lambda 运行更长的时间(开始时延长 20%),然后用一个空队列重新启动,并实时观察统计数据。
在我当前的项目中,我的 objective 是从帧流中检测不同的对象。视频帧是使用与 Raspberry PI 连接的摄像头捕获的。
下面是一个粗略的架构。
架构设计如下:
video_cap.py
代码是 raspberry PI 上的 运行。此代码将图像流发送到 AWS 中的 Kinesis Data Stream(称为FrameStream
)。FrameStream
数据流接收流并触发到 lambda 函数(名为lambda_function.py
)。 lambda 函数是使用Python 3.7
编写的。
此 lambda 函数接收图像流,触发 AWS Rekognition 并发送电子邮件通知。
我的问题是即使我停止(按 Ctrl + C
)(video_cap.py
python 文件,运行 在 raspberry PI ),lambda 函数不断将日志(报告旧的接收帧)写入 CloudWatch。
请帮助我 - 我该如何解决这个问题?如果您需要任何其他信息,请告诉我。
video_cap.py
文件代码
# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
# Licensed under the Amazon Software License (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
# http://aws.amazon.com/asl/
# or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and limitations under the License.
import sys
import cPickle
import datetime
import cv2
import boto3
import time
import cPickle
from multiprocessing import Pool
import pytz
kinesis_client = boto3.client("kinesis")
rekog_client = boto3.client("rekognition")
camera_index = 0 # 0 is usually the built-in webcam
capture_rate = 30 # Frame capture rate.. every X frames. Positive integer.
rekog_max_labels = 123
rekog_min_conf = 50.0
#Send frame to Kinesis stream
def encode_and_send_frame(frame, frame_count, enable_kinesis=True, enable_rekog=False, write_file=False):
try:
#convert opencv Mat to jpg image
#print "----FRAME---"
retval, buff = cv2.imencode(".jpg", frame)
img_bytes = bytearray(buff)
utc_dt = pytz.utc.localize(datetime.datetime.now())
now_ts_utc = (utc_dt - datetime.datetime(1970, 1, 1, tzinfo=pytz.utc)).total_seconds()
frame_package = {
'ApproximateCaptureTime' : now_ts_utc,
'FrameCount' : frame_count,
'ImageBytes' : img_bytes
}
if write_file:
print("Writing file img_{}.jpg".format(frame_count))
target = open("img_{}.jpg".format(frame_count), 'w')
target.write(img_bytes)
target.close()
#put encoded image in kinesis stream
if enable_kinesis:
print "Sending image to Kinesis"
response = kinesis_client.put_record(
StreamName="FrameStream",
Data=cPickle.dumps(frame_package),
PartitionKey="partitionkey"
)
print response
if enable_rekog:
response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=rekog_max_labels,
MinConfidence=rekog_min_conf
)
print response
except Exception as e:
print e
def main():
argv_len = len(sys.argv)
if argv_len > 1 and sys.argv[1].isdigit():
capture_rate = int(sys.argv[1])
cap = cv2.VideoCapture(0) #Use 0 for built-in camera. Use 1, 2, etc. for attached cameras.
pool = Pool(processes=3)
frame_count = 0
while True:
# Capture frame-by-frame
ret, frame = cap.read()
#cv2.resize(frame, (640, 360));
if ret is False:
break
if frame_count % capture_rate == 0:
result = pool.apply_async(encode_and_send_frame, (frame, frame_count, True, False, False,))
frame_count += 1
# Display the resulting frame
cv2.imshow('frame', frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# When everything done, release the capture
cap.release()
cv2.destroyAllWindows()
return
if __name__ == '__main__':
main()
Lambda 函数(lambda_function.py
)
from __future__ import print_function
import base64
import json
import logging
import _pickle as cPickle
#import time
from datetime import datetime
import decimal
import uuid
import boto3
from copy import deepcopy
logger = logging.getLogger()
logger.setLevel(logging.INFO)
rekog_client = boto3.client('rekognition')
# S3 Configuration
s3_client = boto3.client('s3')
s3_bucket = "bucket-name-XXXXXXXXXXXXX"
s3_key_frames_root = "frames/"
# SNS Configuration
sns_client = boto3.client('sns')
label_watch_sns_topic_arn = "SNS-ARN-XXXXXXXXXXXXXXXX"
#Iterate on rekognition labels. Enrich and prep them for storage in DynamoDB
labels_on_watch_list = []
labels_on_watch_list_set = []
text_list_set = []
# List for detected text
text_list = []
def process_image(event, context):
# Start of for Loop
for record in event['Records']:
frame_package_b64 = record['kinesis']['data']
frame_package = cPickle.loads(base64.b64decode(frame_package_b64))
img_bytes = frame_package["ImageBytes"]
approx_capture_ts = frame_package["ApproximateCaptureTime"]
frame_count = frame_package["FrameCount"]
now_ts = datetime.now()
frame_id = str(uuid.uuid4())
approx_capture_timestamp = decimal.Decimal(approx_capture_ts)
year = now_ts.strftime("%Y")
mon = now_ts.strftime("%m")
day = now_ts.strftime("%d")
hour = now_ts.strftime("%H")
#=== Object Detection from an Image =====
# AWS Rekognition - Label detection from an image
rekog_response = rekog_client.detect_labels(
Image={
'Bytes': img_bytes
},
MaxLabels=10,
MinConfidence= 90.0
)
logger.info("Rekognition Response" + str(rekog_response) )
for label in rekog_response['Labels']:
lbl = label['Name']
conf = label['Confidence']
labels_on_watch_list.append(deepcopy(lbl))
labels_on_watch_list_set = set(labels_on_watch_list)
#print(labels_on_watch_list)
logger.info("Labels on watch list ==>" + str(labels_on_watch_list_set) )
# Vehicle Detection
#if (lbl.upper() in (label.upper() for label in ["Transportation", "Vehicle", "Van" , "Ambulance" , "Bus"]) and conf >= 50.00):
#labels_on_watch_list.append(deepcopy(label))
#=== Detecting text from a detected Object
# Detect text from the detected vehicle using detect_text()
response=rekog_client.detect_text( Image={ 'Bytes': img_bytes })
textDetections=response['TextDetections']
for text in textDetections:
text_list.append(text['DetectedText'])
text_list_set = set(text_list)
logger.info("Text Detected ==>" + str(text_list_set))
# End of for Loop
# SNS Notification
if len(labels_on_watch_list_set) > 0 :
logger.info("I am in SNS Now......")
notification_txt = 'On {} Vehicle was spotted with {}% confidence'.format(now_ts.strftime('%x, %-I:%M %p %Z'), round(label['Confidence'], 2))
resp = sns_client.publish(TopicArn=label_watch_sns_topic_arn,
Message=json.dumps(
{
"message": notification_txt + " Detected Object Categories " + str(labels_on_watch_list_set) + " " + " Detect text on the Object " + " " + str(text_list_set)
}
))
#Store frame image in S3
s3_key = (s3_key_frames_root + '{}/{}/{}/{}/{}.jpg').format(year, mon, day, hour, frame_id)
s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=img_bytes
)
print ("Successfully processed records.")
return {
'statusCode': 200,
'body': json.dumps('Successfully processed records.')
}
def lambda_handler(event, context):
logger.info("Received event from Kinesis ......" )
logger.info("Received event ===>" + str(event))
return process_image(event, context)
Lambda 权限
以下是 Lambda 角色附加的 IAM 策略。
以下是 Kinesis Data Stream 日志(日期为 2019 年 8 月 17 日 - 1:54 PM IST[=64=])。上一次,2019 年 8 月 16 日通过 Raspberry PI 摄取的数据 - 6:45 PM)
看起来流中有大约 117K 条记录,但每次处理 1 条记录的速度很慢。 lambda 处理一条记录需要多长时间?我会得到你的 lambda 运行多长时间,更新 python 让代码休眠一段时间,让 lambda 运行更长的时间(开始时延长 20%),然后用一个空队列重新启动,并实时观察统计数据。