在 Amazon EMR 中将 JSON 转换为 Parquet
Converting JSON to Parquet in Amazon EMR
我需要实现以下目标,但由于我对 Spark 缺乏经验,所以很难想出实现它的方法:
- 从存储在 S3 中的 .json.gz 个文件中读取数据。
- 每个文件都包含 Google 部分一天的分析数据,其架构如 https://support.google.com/analytics/answer/3437719?hl=en 中指定。
- 文件名采用 ga_sessions_20170101_Part000000000000_TX.json.gz 模式,其中 20170101 是 YYYYMMDD 日期规范,000000000000 是一天有多个文件时的增量计数器(通常是这种情况).
- 因此,一整天的数据由多个文件组成,增量为"part numbers"。
- 一般每天3到5个文件。
- JSON 文件中的所有字段都使用 qoute (") 分隔符存储,无论上述模式文档中指定的数据类型如何。读取文件产生的数据框(通过 sqlContext.read.json) 因此将每个字段都键入为字符串,即使有些字段实际上是整数、布尔值或其他数据类型。
- 根据架构规范将全字符串数据框转换为正确类型的数据框。
- 我的目标是正确输入数据框,以便在以 Parquet 格式保存时数据类型正确。
- 并非模式规范中的所有字段都存在于每个输入文件中,甚至不是每天的输入文件中(模式可能随时间发生变化)。因此,转换需要是动态的,仅转换数据框中实际存在的字段的数据类型。
- 将正确类型的数据框中的数据以 Parquet 格式写回 S3。
- 数据应按天分区,每个分区存储在名为 "partition_date=YYYYMMDD" 的单独文件夹中,其中 "YYYYMMDD" 是与数据关联的实际日期(来自原始输入文件名)。
- 我认为每天的文件数量无关紧要。目标只是将 Parquet 格式数据分区,我可以将 Spectrum 指向该数据。
我已经能够成功读取和写入数据,但在整个任务的几个方面都没有成功:
- 我不知道如何解决这个问题以确保我有效地利用 AWS EMR 集群来充分发挥 parallel/distributed 处理的潜力,无论是读取、转换还是写入数据。我想根据需要调整集群大小,以便在我选择的任何时间范围内(在合理范围内)完成任务。
- 我不知道如何最好地完成数据类型转换。不知道哪些字段会或不会出现在任何特定批次的输入文件中需要动态代码来重新输入数据框。我还想确保此任务得到有效分配并且不会低效完成(我担心在重新输入每个字段时创建一个新的数据框)。
- 我不明白如何适当地管理数据分区。
如果您能通过整体方法提供任何帮助,我们将不胜感激!
如果您的输入 JSON 具有固定架构,您可以手动指定 DF 架构,将字段声明为可选。参考官方guide。
如果所有值都在 "" 中,则可以将它们作为字符串读取,然后转换为所需的类型。
I don't know how to approach the problem to ensure that I'm effectively...
使用 Dataframe API 读取输入,默认值很可能适合此任务。如果遇到性能问题,请附加 Spark 作业时间线。
I don't know how to best accomplish the data type conversion...
使用 cast column.cast(DataType)
方法。
例如,您有 2 个 JSON:
{"foo":"firstVal"}{"foo":"val","bar" : "1"}
并且您想将 'foo' 读取为字符串,将 bar 读取为整数,您可以这样写:
val schema = StructType(
StructField("foo", StringType, true) ::
StructField("bar", StringType, true) :: Nil
)
val df = session.read
.format("json").option("path", s"${yourPath}")
.schema(schema)
.load()
val withCast = df.select('foo, 'bar cast IntegerType)
withCast.show()
withCast.write.format("parquet").save(s"${outputPath}")
我需要实现以下目标,但由于我对 Spark 缺乏经验,所以很难想出实现它的方法:
- 从存储在 S3 中的 .json.gz 个文件中读取数据。
- 每个文件都包含 Google 部分一天的分析数据,其架构如 https://support.google.com/analytics/answer/3437719?hl=en 中指定。
- 文件名采用 ga_sessions_20170101_Part000000000000_TX.json.gz 模式,其中 20170101 是 YYYYMMDD 日期规范,000000000000 是一天有多个文件时的增量计数器(通常是这种情况).
- 因此,一整天的数据由多个文件组成,增量为"part numbers"。
- 一般每天3到5个文件。
- JSON 文件中的所有字段都使用 qoute (") 分隔符存储,无论上述模式文档中指定的数据类型如何。读取文件产生的数据框(通过 sqlContext.read.json) 因此将每个字段都键入为字符串,即使有些字段实际上是整数、布尔值或其他数据类型。
- 根据架构规范将全字符串数据框转换为正确类型的数据框。
- 我的目标是正确输入数据框,以便在以 Parquet 格式保存时数据类型正确。
- 并非模式规范中的所有字段都存在于每个输入文件中,甚至不是每天的输入文件中(模式可能随时间发生变化)。因此,转换需要是动态的,仅转换数据框中实际存在的字段的数据类型。
- 将正确类型的数据框中的数据以 Parquet 格式写回 S3。
- 数据应按天分区,每个分区存储在名为 "partition_date=YYYYMMDD" 的单独文件夹中,其中 "YYYYMMDD" 是与数据关联的实际日期(来自原始输入文件名)。
- 我认为每天的文件数量无关紧要。目标只是将 Parquet 格式数据分区,我可以将 Spectrum 指向该数据。
我已经能够成功读取和写入数据,但在整个任务的几个方面都没有成功:
- 我不知道如何解决这个问题以确保我有效地利用 AWS EMR 集群来充分发挥 parallel/distributed 处理的潜力,无论是读取、转换还是写入数据。我想根据需要调整集群大小,以便在我选择的任何时间范围内(在合理范围内)完成任务。
- 我不知道如何最好地完成数据类型转换。不知道哪些字段会或不会出现在任何特定批次的输入文件中需要动态代码来重新输入数据框。我还想确保此任务得到有效分配并且不会低效完成(我担心在重新输入每个字段时创建一个新的数据框)。
- 我不明白如何适当地管理数据分区。
如果您能通过整体方法提供任何帮助,我们将不胜感激!
如果您的输入 JSON 具有固定架构,您可以手动指定 DF 架构,将字段声明为可选。参考官方guide。 如果所有值都在 "" 中,则可以将它们作为字符串读取,然后转换为所需的类型。
I don't know how to approach the problem to ensure that I'm effectively...
使用 Dataframe API 读取输入,默认值很可能适合此任务。如果遇到性能问题,请附加 Spark 作业时间线。
I don't know how to best accomplish the data type conversion...
使用 cast column.cast(DataType)
方法。
例如,您有 2 个 JSON:
{"foo":"firstVal"}{"foo":"val","bar" : "1"}
并且您想将 'foo' 读取为字符串,将 bar 读取为整数,您可以这样写:
val schema = StructType(
StructField("foo", StringType, true) ::
StructField("bar", StringType, true) :: Nil
)
val df = session.read
.format("json").option("path", s"${yourPath}")
.schema(schema)
.load()
val withCast = df.select('foo, 'bar cast IntegerType)
withCast.show()
withCast.write.format("parquet").save(s"${outputPath}")