附加唯一 ID 的 Spark 数据集
Spark Dataset appending unique ID
我正在查看 spark 数据集上是否存在 "already implemented alternative" 到 append
的唯一 ID。
我的场景:
我有一个增量作业,每天运行处理一批信息。在此作业中,我创建了 something
的维度 table,并使用 monotonically_increasing_id()
为每一行分配唯一 ID。第二天,我想向 something
table 添加一些行,并希望为这些行生成唯一的 ID。
示例:
第 1 天:
something_table
uniqueID name
100001 A
100002 B
第 2 天:
something_table
uniqueId name
100001 A
100002 B
100003 C -- new data that must be created on day 2
第 1 天的截取代码:
case class BasicSomething(name: String)
case class SomethingTable(id: Long, name: String)
val ds: Dataset[BasicSomething] = spark.createDataset(Seq(BasicSomething("A"), BasicSomething("B")))
ds.withColumn("uniqueId", monotonically_increasing_id())
.as[SomethingTable]
.write.csv("something")
我不知道如何以一种方式保持 monotonically_increasing_id()
的状态,以便在第二天它将知道来自 something_table
唯一 ID 的现有 ID。
您始终可以获得您创建的 数据集 的 last uniqueId。因此,您可以 将 uniqueId 与 monotically_increasing_id() 一起使用并创建新的 uniqueIds。
ds.withColumn("uniqueId", monotonically_increasing_id()+last uniqueId of previous dataframe)
我正在查看 spark 数据集上是否存在 "already implemented alternative" 到 append
的唯一 ID。
我的场景:
我有一个增量作业,每天运行处理一批信息。在此作业中,我创建了 something
的维度 table,并使用 monotonically_increasing_id()
为每一行分配唯一 ID。第二天,我想向 something
table 添加一些行,并希望为这些行生成唯一的 ID。
示例:
第 1 天:
something_table
uniqueID name
100001 A
100002 B
第 2 天:
something_table
uniqueId name
100001 A
100002 B
100003 C -- new data that must be created on day 2
第 1 天的截取代码:
case class BasicSomething(name: String)
case class SomethingTable(id: Long, name: String)
val ds: Dataset[BasicSomething] = spark.createDataset(Seq(BasicSomething("A"), BasicSomething("B")))
ds.withColumn("uniqueId", monotonically_increasing_id())
.as[SomethingTable]
.write.csv("something")
我不知道如何以一种方式保持 monotonically_increasing_id()
的状态,以便在第二天它将知道来自 something_table
唯一 ID 的现有 ID。
您始终可以获得您创建的 数据集 的 last uniqueId。因此,您可以 将 uniqueId 与 monotically_increasing_id() 一起使用并创建新的 uniqueIds。
ds.withColumn("uniqueId", monotonically_increasing_id()+last uniqueId of previous dataframe)