在 Apache Spark 中按列分区到 S3

Partitioning by column in Apache Spark to S3

有我们想要从具有 JSON 的 S3 读取文件的用例。然后,基于特定的 JSON 节点值,我们希望对数据进行分组并将其写入 S3。

我能够读取数据,但找不到关于如何根据 JSON 键对数据进行分区然后上传到 S3 的好例子。任何人都可以提供任何示例或指向可以帮助我处理此用例的教程吗?

创建数据框后我得到了我的数据模式:

root
 |-- customer: struct (nullable = true)
 |    |-- customerId: string (nullable = true)
 |-- experiment: string (nullable = true)
 |-- expiryTime: long (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- programId: string (nullable = true)
 |-- score: double (nullable = true)
 |-- startTime: long (nullable = true)
 |-- targetSets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- featured: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- data: struct (nullable = true)
 |    |    |    |    |    |-- asinId: string (nullable = true)
 |    |    |    |    |-- pk: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- reason: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- recommended: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)

我想根据 customerId 列上的随机散列对数据进行分区。但是当我这样做时:

df.write.partitionBy("customerId").save("s3/bucket/location/to/save");

它给出错误:

org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));

请告诉我我可以访问 customerId 列。

让我们以数据集为例 sample.json

{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"}
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"}
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"}

现在开始用 Spark 破解它

val jsonDf = spark.read
  .format("json")
  .load("path/of/sample.json")

jsonDf.show()

+---------+-------+-----+-----+
|     CITY|CUST_ID|STATE|  ZIP|
+---------+-------+-----+-----+
| San Jose| 115734|   CA|95106|
|Allentown| 115728|   PA|18101|
|Allentown| 115730|   PA|18101|
|San Mateo| 114728|   CA|94401|
| Somerset| 114726|   NJ| 8873|
+---------+-------+-----+-----+

然后按列"ZIP"划分数据集并写入S3

jsonDf.write
  .partitionBy("ZIP")
  .save("s3/bucket/location/to/save")
  // one liner athentication to s3
  //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save")

Note: In order this code successfully S3 access and secret key has to be configured properly. Check this answer for Spark/Hadoop integration with S3

编辑:解决方案:在架构中找不到分区列 customerId(根据评论)

customerId 存在于 customer 结构中,所以尝试提取 customerId 然后进行分区。

df.withColumn("customerId", $"customer.customerId")
  .drop("customer")
  .write.partitionBy("customerId")
  .save("s3/bucket/location/to/save")