将镶木地板从 AWS Kinesis firehose 写入 AWS S3
Write parquet from AWS Kinesis firehose to AWS S3
我想从格式化为镶木地板的 kinesis firehose 中将数据提取到 s3 中。到目前为止,我刚刚找到了一个意味着创建 EMR 的解决方案,但我正在寻找更便宜和更快的东西,比如直接从 firehose 将接收到的 json 存储为镶木地板或使用 Lambda 函数。
非常感谢,
贾维。
Amazon Kinesis Firehose 接收流记录并可以将它们存储在 Amazon S3(或 Amazon Redshift 或 Amazon Elasticsearch Service)中。
每条记录最大可达 1000KB。
但是,记录会一起附加到一个文本文件中,并根据时间或大小进行批处理。传统上,记录是 JSON 格式。
您将无法发送 parquet 文件,因为它不符合此文件格式。
可以触发Lambda数据转换函数,但是也不能输出parquet文件。
事实上,考虑到 parquet 文件的性质,您不太可能一次构建一个记录。作为一种列式存储格式,我怀疑它们确实需要批量创建而不是附加数据 per-record.
底线:不。
在处理了 AWS 支持服务和上百种不同的实施之后,我想解释一下我所取得的成就。
最后,我创建了一个 Lambda 函数来处理 Kinesis Firehose 生成的每个文件,根据有效负载对我的事件进行分类,并将结果存储在 S3 中的 Parquet 文件中。
做到这一点并不容易:
首先,您应该创建一个 Python 虚拟环境,包括所有必需的库(在我的例子中是 Pandas、NumPy、Fastparquet 等)。
由于结果文件(包含所有库和我的 Lambda 函数很重,因此有必要启动一个 EC2 实例,我使用了免费套餐中包含的那个)。要创建虚拟环境,请按照以下步骤操作:
- 登录 EC2
- 创建一个名为 lambda(或任何其他名称)的文件夹
- 须藤 yum -y 更新
- sudo yum -y upgrade
- sudo yum -y groupinstall "Development Tools"
- sudo yum -y 安装 blas
- sudo yum -y 安装 lapack
- sudo yum -y 安装 atlas-sse3-devel
- sudo yum install python27-devel python27-pip gcc
- 虚拟环境
- 来源env/bin/activate
- pip 安装 boto3
- pip 安装 fastparquet
- pip 安装pandas
- pip 安装 thriftpy
- pip 安装 s3fs
- pip 安装(任何其他所需的库)
- 找到 ~/lambda/env/lib*/python2.7/site-packages/ -name "*.so" | xargs 条带
- pushd env/lib/python2.7/站点包/
- zip -r -9 -q ~/lambda.zip *
- 弹出
- pushd env/lib64/python2.7/站点包/
- zip -r -9 -q ~/lambda.zip *
- 弹出
正确创建 lambda_function:
import json
import boto3
import datetime as dt
import urllib
import zlib
import s3fs
from fastparquet import write
import pandas as pd
import numpy as np
import time
def _send_to_s3_parquet(df):
s3_fs = s3fs.S3FileSystem()
s3_fs_open = s3_fs.open
# FIXME add something else to the key or it will overwrite the file
key = 'mybeautifullfile.parquet.gzip'
# Include partitions! key1 and key2
write( 'ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df,
compression='GZIP',open_with=s3_fs_open)
def lambda_handler(event, context):
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'])
try:
s3 = boto3.client('s3')
response = s3.get_object(Bucket=bucket, Key=key)
data = response['Body'].read()
decoded = data.decode('utf-8')
lines = decoded.split('\n')
# Do anything you like with the dataframe (Here what I do is to classify them
# and write to different folders in S3 according to the values of
# the columns that I want
df = pd.DataFrame(lines)
_send_to_s3_parquet(df)
except Exception as e:
print('Error getting object {} from bucket {}.'.format(key, bucket))
raise e
将 lambda 函数复制到 lambda.zip 并部署 lambda_function:
- 回到你的EC2实例,将需要的lambda函数添加到zip:zip -9 lambda.zip lambda_function.py(lambda_function.py是第2步生成的文件)
- 将生成的zip文件复制到S3,如果不通过S3部署的话会很重。 aws s3 cp lambda.zip s3://support-bucket/lambda_packages/
- 部署lambda函数:aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages/lambda.zip
在您喜欢的时候触发执行,例如,每次在 S3 中创建一个新文件时,甚至您可以将 lambda 函数关联到 Firehose。 (我没有选择这个选项,因为 'lambda' 限制低于 Firehose 限制,你可以配置 Firehose 每 128Mb 或 15 分钟写入一个文件,但是如果你将这个 lambda 函数关联到 Firehose,lambda 函数将每 3 分钟或 5MB 执行一次,在我的例子中,我遇到了生成大量小镶木地板文件的问题,因为每次启动 lambda 函数时,我至少生成 10 个文件)。
好消息,今天发布了这个功能!
Amazon Kinesis Data Firehose can convert the format of your input data
from JSON to Apache Parquet or Apache ORC before storing the data in
Amazon S3. Parquet and ORC are columnar data formats that save space
and enable faster queries
要启用,请转到您的 Firehose 流并单击 编辑。您应该看到 记录格式转换 部分,如下面的屏幕截图所示:
详见文档:https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html
我想从格式化为镶木地板的 kinesis firehose 中将数据提取到 s3 中。到目前为止,我刚刚找到了一个意味着创建 EMR 的解决方案,但我正在寻找更便宜和更快的东西,比如直接从 firehose 将接收到的 json 存储为镶木地板或使用 Lambda 函数。
非常感谢, 贾维。
Amazon Kinesis Firehose 接收流记录并可以将它们存储在 Amazon S3(或 Amazon Redshift 或 Amazon Elasticsearch Service)中。
每条记录最大可达 1000KB。
但是,记录会一起附加到一个文本文件中,并根据时间或大小进行批处理。传统上,记录是 JSON 格式。
您将无法发送 parquet 文件,因为它不符合此文件格式。
可以触发Lambda数据转换函数,但是也不能输出parquet文件。
事实上,考虑到 parquet 文件的性质,您不太可能一次构建一个记录。作为一种列式存储格式,我怀疑它们确实需要批量创建而不是附加数据 per-record.
底线:不。
在处理了 AWS 支持服务和上百种不同的实施之后,我想解释一下我所取得的成就。
最后,我创建了一个 Lambda 函数来处理 Kinesis Firehose 生成的每个文件,根据有效负载对我的事件进行分类,并将结果存储在 S3 中的 Parquet 文件中。
做到这一点并不容易:
首先,您应该创建一个 Python 虚拟环境,包括所有必需的库(在我的例子中是 Pandas、NumPy、Fastparquet 等)。 由于结果文件(包含所有库和我的 Lambda 函数很重,因此有必要启动一个 EC2 实例,我使用了免费套餐中包含的那个)。要创建虚拟环境,请按照以下步骤操作:
- 登录 EC2
- 创建一个名为 lambda(或任何其他名称)的文件夹
- 须藤 yum -y 更新
- sudo yum -y upgrade
- sudo yum -y groupinstall "Development Tools"
- sudo yum -y 安装 blas
- sudo yum -y 安装 lapack
- sudo yum -y 安装 atlas-sse3-devel
- sudo yum install python27-devel python27-pip gcc
- 虚拟环境
- 来源env/bin/activate
- pip 安装 boto3
- pip 安装 fastparquet
- pip 安装pandas
- pip 安装 thriftpy
- pip 安装 s3fs
- pip 安装(任何其他所需的库)
- 找到 ~/lambda/env/lib*/python2.7/site-packages/ -name "*.so" | xargs 条带
- pushd env/lib/python2.7/站点包/
- zip -r -9 -q ~/lambda.zip *
- 弹出
- pushd env/lib64/python2.7/站点包/
- zip -r -9 -q ~/lambda.zip *
- 弹出
正确创建 lambda_function:
import json import boto3 import datetime as dt import urllib import zlib import s3fs from fastparquet import write import pandas as pd import numpy as np import time def _send_to_s3_parquet(df): s3_fs = s3fs.S3FileSystem() s3_fs_open = s3_fs.open # FIXME add something else to the key or it will overwrite the file key = 'mybeautifullfile.parquet.gzip' # Include partitions! key1 and key2 write( 'ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df, compression='GZIP',open_with=s3_fs_open) def lambda_handler(event, context): # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key']) try: s3 = boto3.client('s3') response = s3.get_object(Bucket=bucket, Key=key) data = response['Body'].read() decoded = data.decode('utf-8') lines = decoded.split('\n') # Do anything you like with the dataframe (Here what I do is to classify them # and write to different folders in S3 according to the values of # the columns that I want df = pd.DataFrame(lines) _send_to_s3_parquet(df) except Exception as e: print('Error getting object {} from bucket {}.'.format(key, bucket)) raise e
将 lambda 函数复制到 lambda.zip 并部署 lambda_function:
- 回到你的EC2实例,将需要的lambda函数添加到zip:zip -9 lambda.zip lambda_function.py(lambda_function.py是第2步生成的文件)
- 将生成的zip文件复制到S3,如果不通过S3部署的话会很重。 aws s3 cp lambda.zip s3://support-bucket/lambda_packages/
- 部署lambda函数:aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages/lambda.zip
在您喜欢的时候触发执行,例如,每次在 S3 中创建一个新文件时,甚至您可以将 lambda 函数关联到 Firehose。 (我没有选择这个选项,因为 'lambda' 限制低于 Firehose 限制,你可以配置 Firehose 每 128Mb 或 15 分钟写入一个文件,但是如果你将这个 lambda 函数关联到 Firehose,lambda 函数将每 3 分钟或 5MB 执行一次,在我的例子中,我遇到了生成大量小镶木地板文件的问题,因为每次启动 lambda 函数时,我至少生成 10 个文件)。
好消息,今天发布了这个功能!
Amazon Kinesis Data Firehose can convert the format of your input data from JSON to Apache Parquet or Apache ORC before storing the data in Amazon S3. Parquet and ORC are columnar data formats that save space and enable faster queries
要启用,请转到您的 Firehose 流并单击 编辑。您应该看到 记录格式转换 部分,如下面的屏幕截图所示:
详见文档:https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html