根据Map类型列的大小将Spark DataFrame行分成多行

Divide Spark DataFrame rows into multiple rows depending on the size of Map type column

Spark DataFrame 结构:

root
 |-- partition_key: string (nullable = true)
 |-- row_key: string (nullable = true)
 |-- attributes: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- data_as_of_date: string (nullable = true)

attributes 列的类型为 map。它可以包含超过 65535 个键值对。

我正在将相同的 DataFrame 写入 Cassandra table。 Table DDL 如下:

create table <keyspace>.<table> (
  partition_key text,
  row_key text,
  attributes map<text,text>,
  data_as_of_date text,
  PRIMARY KEY (partition_key, row_key)
  );

问题:
在 Cassandra 中,attributesmap<text, text> 类型列不能包含超过 65535 个键值对。

问题:
如果 DataFrame 映射类型列具有超过 65535 个键值对,有人可以帮助我使用 Scala-Spark 片段将 DataFrame 行分成多行。

例如, 如果 DataFrame 行在 map type 列中有 163838 个键值对,那么该行应该被分成 3 行。 row_key 将为该行附加 _ + Seq No. ,以便在将行分成多行后生成唯一的 Cassandra 复合主键。如果地图类型具有 <= 65535 个键值对,则该行不应更改。

值如下所示:

<partition_key>, <row_key>_1, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_2, <65535 key value pairs from the map>, <data_as_of_date>
<partition_key>, <row_key>_3, <32768 key value pairs from the map>, <data_as_of_date>

请使用以下示例 DataFrame/code。地图大小大于 2 的任何行都应分成多行。

val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
    )

    val df = spark.createDataFrame(data)

输出的DataFrame应该如下所示:

"123", "123001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123001_3", Map("key5" -> "value5"), "20210725"
"123", "123002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123002_3", Map("key5" -> "value5"), "20210725"
"123", "123003_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"123", "123003_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"123", "123003_3", Map("key5" -> "value5"), "20210725"
"456", "456001_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456001_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456001_3", Map("key5" -> "value5"), "20210725"
"456", "456002_1", Map("key1" -> "value1", "key2" -> "value2"), "20210725"
"456", "456002_2", Map("key3" -> "value3", "key4" -> "value4"), "20210725"
"456", "456002_3", Map("key5" -> "value5"), "20210725"
"456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725"

这是一个解决方案,包括:

  1. 展开地图列
  2. 整数将位置列划分为组列
  3. groupBy 将同一组再次合并到地图中
  4. 连接 row_key 和组列

所以我认为这应该给出预期的结果:

val data = Seq(("123", "123001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      (123", "123002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("123", "123003", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456001", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456002", Map("key1" -> "value1", "key2" -> "value2", "key3" -> "value3", "key4" -> "value4", "key5" -> "value5"), "20210725"),
      ("456", "456003", Map("key1" -> "value1", "key2" -> "value2"), "20210725")
    )

val df = data.toDF("partition_key", "row_key", "attributes", "data_as_of_date")

val maxItem = 2

df.select($"partition_key", $"row_key", $"data_as_of_date", posexplode($"attributes"))
  .withColumn("group", $"pos".divide(maxItem).cast("int"))
  .groupBy($"partition_key", $"row_key", $"data_as_of_date", $"group")
  .agg(collect_list(map($"key", $"value")).as("attributes"))
  .withColumn("row_key", concat($"row_key", lit("_"), $"group"))
  .select($"partition_key", $"row_key", $"attributes", $"data_as_of_date")
  .show