根据Map类型列的大小将Spark DataFrame行分成多行
Divide Spark DataFrame rows into multiple rows depending on the size of Map type column
Spark DataFrame 结构:
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
attributes
列的类型为 map
。它可以包含超过 65535
个键值对。
我正在将相同的 DataFrame 写入 Cassandra table。 Table DDL 如下:
create table <keyspace>.<table> (
partition_key text,
row_key text,
attributes map<text,text>,
data_as_of_date text,
PRIMARY KEY (partition_key, row_key)
);
问题:
在 Cassandra 中,attributes
的 map<text, text>
类型列不能包含超过 65535
个键值对。
问题:
如果 DataFrame 映射类型列具有超过 65535 个键值对,有人可以帮助我使用 Scala-Spark 片段将 DataFrame 行分成多行。
例如,
如果 DataFrame 行在 map type 列中有 163838 个键值对,那么该行应该被分成 3 行。 row_key
将为该行附加 _ + Seq No.
,以便在将行分成多行后生成唯一的 Cassandra 复合主键。如果地图类型具有 <= 65535 个键值对,则该行不应更改。
值如下所示:
<partition_key>, <row_key>_1, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_2, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_3, <32768 key value pairs from the map>, <data_as_of_date>
请使用以下示例 DataFrame/code。地图大小大于 2 的任何行都应分成多行。
val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
)
val df = spark.createDataFrame(data)
输出的DataFrame应该如下所示:
"123", "123001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123001_3", Map("key5" -> "value5"), "20210725"
"123", "123002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123002_3", Map("key5" -> "value5"), "20210725"
"123", "123003_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123003_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123003_3", Map("key5" -> "value5"), "20210725"
"456", "456001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456001_3", Map("key5" -> "value5"), "20210725"
"456", "456002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456002_3", Map("key5" -> "value5"), "20210725"
"456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
这是一个解决方案,包括:
- 展开地图列
- 整数将位置列划分为组列
- groupBy 将同一组再次合并到地图中
- 连接 row_key 和组列
所以我认为这应该给出预期的结果:
val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
(123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
)
val df = data.toDF("partition_key", "row_key", "attributes", "data_as_of_date")
val maxItem = 2
df.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.withColumn("row_key", concat($"row_key", lit("_"), $"group"))
.select($"partition_key", $"row_key", $"attributes", $"data_as_of_date")
.show
Spark DataFrame 结构:
root
|-- partition_key: string (nullable = true)
|-- row_key: string (nullable = true)
|-- attributes: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- data_as_of_date: string (nullable = true)
attributes
列的类型为 map
。它可以包含超过 65535
个键值对。
我正在将相同的 DataFrame 写入 Cassandra table。 Table DDL 如下:
create table <keyspace>.<table> (
partition_key text,
row_key text,
attributes map<text,text>,
data_as_of_date text,
PRIMARY KEY (partition_key, row_key)
);
问题:
在 Cassandra 中,attributes
的 map<text, text>
类型列不能包含超过 65535
个键值对。
问题:
如果 DataFrame 映射类型列具有超过 65535 个键值对,有人可以帮助我使用 Scala-Spark 片段将 DataFrame 行分成多行。
例如,
如果 DataFrame 行在 map type 列中有 163838 个键值对,那么该行应该被分成 3 行。 row_key
将为该行附加 _ + Seq No.
,以便在将行分成多行后生成唯一的 Cassandra 复合主键。如果地图类型具有 <= 65535 个键值对,则该行不应更改。
值如下所示:
<partition_key>, <row_key>_1, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_2, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_3, <32768 key value pairs from the map>, <data_as_of_date>
请使用以下示例 DataFrame/code。地图大小大于 2 的任何行都应分成多行。
val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
)
val df = spark.createDataFrame(data)
输出的DataFrame应该如下所示:
"123", "123001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123001_3", Map("key5" -> "value5"), "20210725"
"123", "123002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123002_3", Map("key5" -> "value5"), "20210725"
"123", "123003_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123003_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123003_3", Map("key5" -> "value5"), "20210725"
"456", "456001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456001_3", Map("key5" -> "value5"), "20210725"
"456", "456002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456002_3", Map("key5" -> "value5"), "20210725"
"456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
这是一个解决方案,包括:
- 展开地图列
- 整数将位置列划分为组列
- groupBy 将同一组再次合并到地图中
- 连接 row_key 和组列
所以我认为这应该给出预期的结果:
val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
(123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
)
val df = data.toDF("partition_key", "row_key", "attributes", "data_as_of_date")
val maxItem = 2
df.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
.withColumn("group", $"pos".divide(maxItem).cast("int"))
.groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
.agg(collect_list(map($"key", $"value")).as("attributes"))
.withColumn("row_key", concat($"row_key", lit("_"), $"group"))
.select($"partition_key", $"row_key", $"attributes", $"data_as_of_date")
.show