使用 pyspark 按日期元素读取和分组 json 文件
Read and group json files by date element using pyspark
我在 S3 存储桶上有多个 JSON 文件(10 TB ~),我需要按每个 json 文档中存在的日期元素组织这些文件。
我认为我的代码需要做什么
- 读取 s3 存储桶中的所有 json 个文件。
- 保留所有在 2022-01-01 和 2022-04-01 之间具有元素“creation_date”的文档
- 以镶木地板格式将它们保存在另一个存储桶中。
考虑到我要处理的规模,我不确定这样做是否正确。
这是一个 json 文档的示例。每个文件都有多个这样的文档。
{
"id": 123456,
"creation_date": "2022-01-01T23:35:16",
"params": {
"doc_info": "AXBD",
"return_date": "20/05/2021",
"user_name": "XXXXXXXX",
"value": "40,00"
},
"user_id": "1234567",
"type": "TEST"
}
]
这是我已经在DB笔记本上尝试过的,但实际上,我不能直接在笔记本上使用代码。我需要在 airflow dag 上编写一个 spark 代码和 运行,因为我没有直接从笔记本上使用的存储桶的写权限。
# Trying to read all the json files
df_test = spark.read.json("s3://my-bucket/**/**" + "/*.json")
# Filtering all documents that has the creation_date period that I want
df_test_filter = df_test.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
# Write parquet on another bucket
# In this test, I'm saving on a local bucket that I have write access.
df_test_filter.write.mode('overwrite').parquet("s3://my-local-test-bucket/")
这似乎在我用来测试的单个 json 文件上运行良好,但我的问题是:
- 如果没有 databricks notebook,如何使用带有 pyspark 的 airflow dag 来做到这一点?
- 考虑到性能问题,有更好的办法吗?
对于 运行 AWS 上的 PySpark 作业,我建议使用 AWS Glue 或 EMR。
EMR 比 运行 便宜,但 AWS Glue 更容易配置。
这里是 example Glue 作业的样子。
Airflow 有一个 Glue job operator 可以从 Airflow DAG 触发 Glue 作业。
关于性能优化,您的代码看起来相当优化,不太可能让它运行得更快。
加快日期范围选择的一种方法是根据 creation_date
.
将 JSON 存储在不同的文件夹中
您可以将数据存储在以下文件夹中:
s3://my-bucket/creation-date=2022-01-01/
s3://my-bucket/creation-date=2022-01-02/
如果执行此操作,则在按日期范围过滤时不需要读取所有 JSON。
好的@fahabashev 做得很好但错过了一些关键点。
To run PySpark jobs on AWS I recommend to use either AWS Glue or EMR.
The EMR is cheaper to run but AWS Glue is easier to configure.
Here is one example of how Glue job might look like.
Airflow has a Glue job operator that can trigger a Glue job from an
Airflow DAG.
听起来不错。您不想将文件写入 S3 中的目录。 S3 中的文件查找非常昂贵。 (避免使用“*”)为了获得最佳性能,请在镶木地板中写入一个大文件并让 CPU 进行过滤。使用 zippable/splittable 格式(Parquet)的绝佳选择。不要移动 JSON 文件,尽快将它们吸入 Parquet。 JSON 解析并不昂贵,读取容纳 JSON 所需的所有字符是昂贵的。我已经看到从 JSON 和使用 Parquet/ORC 开始的性能提高了 10000%。我认为您应该尽快开始迁移,因为多个文件查找会浪费很多时间。
你想 运行 只做一次工作还是想定期做?
一个运行
你所拥有的应该很好用
# Trying to read all the json files
sdf = spark.read.json("s3://my-bucket/**/**/*.json")
我唯一要添加的是按日期对输出进行分区以加快查询速度:
(
# Filtering all documents that has the creation_date period that I want
sdf.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
# Partition by creation date so that's easier to query
.partitionBy("creation_date")
# Export the data
.write.mode('append')
.parquet("s3://my-local-test-bucket/")
)
运行 定期
这里我想知道文件结构是什么。按某些日期对数据进行分区是一个好主意,在这种情况下,您可能会将输入数据按另一个日期进行分区(可能 insert_date
?)。
假设是这种情况,我建议您每天读取该数据,然后将其写入按您想要的日期分区的镶木地板。
这将由以下人员完成:
# Trying to read all the json files
sdf = spark.read.json(f"s3://my-bucket/insert_date={today:%Y-%m-%d}/*/")
sdf.partitionBy("creation_date").write.mode('append').parquet("s3://my-local-test-bucket/")
稍后您可以简单地检索您需要的数据:
sdf = (
spark.read.json(f"s3://my-bucket/")
.where(F.col("creation_date").between('2022-01-01','2022-04-01'))
)
'#嘿,这是答案:
正在尝试读取所有文件
sdf = spark.read.json("s3://my-bucket///*.json")'
我在 S3 存储桶上有多个 JSON 文件(10 TB ~),我需要按每个 json 文档中存在的日期元素组织这些文件。
我认为我的代码需要做什么
- 读取 s3 存储桶中的所有 json 个文件。
- 保留所有在 2022-01-01 和 2022-04-01 之间具有元素“creation_date”的文档
- 以镶木地板格式将它们保存在另一个存储桶中。
考虑到我要处理的规模,我不确定这样做是否正确。
这是一个 json 文档的示例。每个文件都有多个这样的文档。
{
"id": 123456,
"creation_date": "2022-01-01T23:35:16",
"params": {
"doc_info": "AXBD",
"return_date": "20/05/2021",
"user_name": "XXXXXXXX",
"value": "40,00"
},
"user_id": "1234567",
"type": "TEST"
}
]
这是我已经在DB笔记本上尝试过的,但实际上,我不能直接在笔记本上使用代码。我需要在 airflow dag 上编写一个 spark 代码和 运行,因为我没有直接从笔记本上使用的存储桶的写权限。
# Trying to read all the json files
df_test = spark.read.json("s3://my-bucket/**/**" + "/*.json")
# Filtering all documents that has the creation_date period that I want
df_test_filter = df_test.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
# Write parquet on another bucket
# In this test, I'm saving on a local bucket that I have write access.
df_test_filter.write.mode('overwrite').parquet("s3://my-local-test-bucket/")
这似乎在我用来测试的单个 json 文件上运行良好,但我的问题是:
- 如果没有 databricks notebook,如何使用带有 pyspark 的 airflow dag 来做到这一点?
- 考虑到性能问题,有更好的办法吗?
对于 运行 AWS 上的 PySpark 作业,我建议使用 AWS Glue 或 EMR。
EMR 比 运行 便宜,但 AWS Glue 更容易配置。
这里是 example Glue 作业的样子。
Airflow 有一个 Glue job operator 可以从 Airflow DAG 触发 Glue 作业。
关于性能优化,您的代码看起来相当优化,不太可能让它运行得更快。
加快日期范围选择的一种方法是根据 creation_date
.
您可以将数据存储在以下文件夹中:
s3://my-bucket/creation-date=2022-01-01/
s3://my-bucket/creation-date=2022-01-02/
如果执行此操作,则在按日期范围过滤时不需要读取所有 JSON。
好的@fahabashev 做得很好但错过了一些关键点。
To run PySpark jobs on AWS I recommend to use either AWS Glue or EMR.
The EMR is cheaper to run but AWS Glue is easier to configure.
Here is one example of how Glue job might look like.
Airflow has a Glue job operator that can trigger a Glue job from an Airflow DAG.
听起来不错。您不想将文件写入 S3 中的目录。 S3 中的文件查找非常昂贵。 (避免使用“*”)为了获得最佳性能,请在镶木地板中写入一个大文件并让 CPU 进行过滤。使用 zippable/splittable 格式(Parquet)的绝佳选择。不要移动 JSON 文件,尽快将它们吸入 Parquet。 JSON 解析并不昂贵,读取容纳 JSON 所需的所有字符是昂贵的。我已经看到从 JSON 和使用 Parquet/ORC 开始的性能提高了 10000%。我认为您应该尽快开始迁移,因为多个文件查找会浪费很多时间。
你想 运行 只做一次工作还是想定期做?
一个运行
你所拥有的应该很好用
# Trying to read all the json files
sdf = spark.read.json("s3://my-bucket/**/**/*.json")
我唯一要添加的是按日期对输出进行分区以加快查询速度:
(
# Filtering all documents that has the creation_date period that I want
sdf.filter(F.col("creation_date").between('2022-01-01','2022-04-01'))
# Partition by creation date so that's easier to query
.partitionBy("creation_date")
# Export the data
.write.mode('append')
.parquet("s3://my-local-test-bucket/")
)
运行 定期
这里我想知道文件结构是什么。按某些日期对数据进行分区是一个好主意,在这种情况下,您可能会将输入数据按另一个日期进行分区(可能 insert_date
?)。
假设是这种情况,我建议您每天读取该数据,然后将其写入按您想要的日期分区的镶木地板。
这将由以下人员完成:
# Trying to read all the json files
sdf = spark.read.json(f"s3://my-bucket/insert_date={today:%Y-%m-%d}/*/")
sdf.partitionBy("creation_date").write.mode('append').parquet("s3://my-local-test-bucket/")
稍后您可以简单地检索您需要的数据:
sdf = (
spark.read.json(f"s3://my-bucket/")
.where(F.col("creation_date").between('2022-01-01','2022-04-01'))
)
'#嘿,这是答案:
正在尝试读取所有文件
sdf = spark.read.json("s3://my-bucket///*.json")'