在进行 upsert 时,spark Hudi Job 中的记录键中有超过 1 列

More than 1 column in record key in spark Hudi Job while making an upsert

我目前正在 deltalake 上做 POC,在那里我遇到了这个名为 Apache Hudi 的框架。以下是我尝试使用 apache spark 框架编写的数据。

 private val INITIAL_ALBUM_DATA = Seq(
Album(800,810, "6 String Theory", Array("Lay it down", "Am I Wrong", "68"), dateToLong("2019-12-01")),
Album(801,811, "Hail to the Thief", Array("2+2=5", "Backdrifts"), dateToLong("2019-12-01")),
Album(801,811, "Hail to the Thief", Array("2+2=5", "Backdrifts", "Go to sleep"), dateToLong("2019-12-03"))
)

The class : 
case class Album(albumId: Long,trackId: Long, title: String, tracks: Array[String], updateDate: Long)

所以我想使用记录键作为 albumId 和 trackId 进行更新插入。所以我尝试使用下面的代码进行初始插入(albumDf 是从上面创建的数据框 INITIAL_ALBUM_DATA):

albumDf.write
.format("hudi")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY, DataSourceWriteOptions.COW_TABLE_TYPE_OPT_VAL)
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "albumId, trackId")
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, combineKey)
.option(HoodieWriteConfig.TABLE_NAME, tableName)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option("hoodie.upsert.shuffle.parallelism", "2")
.mode(SaveMode.Append)
.save(s"$basePath/$tableName/")

不过好像不是用多键写的。我在上面 运行 时得到的错误是:

... 5 more
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "albumId, 
trackId" cannot be null or empty.
at org.apache.hudi.keygen.SimpleKeyGenerator.getKe

有人试过多键吗?当我尝试使用单键 trackId 或 albumId 时,它可以正常工作,但使用 2 个键会失败。目前我使用的是Hudi的0.5.3和scala的2.11版本,spark为2.4.x。我也试过 Hudi 的 0.5.2-incubating/0.6.0.

这可以使用 ComplexKeyGenerator 而不是 SimplekeyGenerator 来解决。

您可以使用 ComplexKeyGenerator 或 CustomKeyGenerator。