Spark Dataframe 随机 UUID 在每 transformation/action 后更改
Spark Dataframe Random UUID changes after every transformation/action
我有一个 Spark 数据框,其中有一列包含生成的 UUID。
但是,每次我对数据帧执行操作或转换时,它都会更改每个阶段的 UUID。
如何只生成一次 UUID 并使 UUID 保持静态。
重现我的问题的一些示例代码如下:
def process(spark: SparkSession): Unit = {
import spark.implicits._
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
sc.setLogLevel("OFF")
// create dataframe
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
df.createOrReplaceTempView("df")
df.show(false)
// register an UDF that creates a random UUID
val generateUUID = udf(() => UUID.randomUUID().toString)
// generate UUID for new column
val dfWithUuid = df.withColumn("new_uuid", generateUUID())
dfWithUuid.show(false)
dfWithUuid.show(false) // uuid is different
// new transformations also change the uuid
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)
}
输出为:
+----+----+
|col1|col2|
+----+----+
|a |1 |
|b |2 |
|c |3 |
+----+----+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b |2 |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c |3 |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b |2 |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c |3 |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+
+----+----+------------------------------------+----+
|col1|col2|new_uuid |col3|
+----+----+------------------------------------+----+
|a |1 |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b |2 |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c |3 |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+
请注意,每一步的 UUID 都不同。
这是预期的行为。用户定义函数 have to be deterministic:
The user-defined functions must be deterministic. Due to optimization,
duplicate invocations may be eliminated or the function may even be
invoked more times than it is present in the query.
如果你想包含非确定性函数并保留输出,你应该将中间数据写入持久存储并读回。检查点或缓存可能在一些简单的情况下有效,但通常不会可靠。
如果上游进程是确定性的(对于初学者来说有随机播放),您可以尝试使用 rand
function with seed,转换为字节数组并传递给 UUID.nameUUIDFromBytes
。
另请参阅:
注意: SPARK-20586 引入了 deterministic
标志,可以禁用某些优化,但不清楚它在数据为 [=13 时的表现=] 并且执行者丢失。
试试这个:
df.withColumn("XXXID", lit(java.util.UUID.randomUUID().toString))
它的工作方式不同于:
val generateUUID = udf(() => java.util.UUID.randomUUID().toString)
df.withColumn("XXXCID", generateUUID() )
希望对您有所帮助。
帕维尔
这是一个非常古老的问题,但让人们知道什么对我有用。它可能会对某人有所帮助。
您可以使用如下所示的 expr 函数生成唯一的 GUID,该 GUID 在转换时不会改变。
import org.apache.spark.sql.functions._
// create dataframe
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
df.createOrReplaceTempView("df")
df.show(false)
// generate UUID for new column
val dfWithUuid = df.withColumn("new_uuid", expr("uuid()"))
dfWithUuid.show(false)
dfWithUuid.show(false)
// new transformations
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)
输出如下:
+----+----+
|col1|col2|
+----+----+
|a |1 |
|b |2 |
|c |3 |
+----+----+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b |2 |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c |3 |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b |2 |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c |3 |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+
+----+----+------------------------------------+----+
|col1|col2|new_uuid |col3|
+----+----+------------------------------------+----+
|a |1 |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|2.0 |
|b |2 |43882a79-8e7f-4002-9740-f22bc6b20db5|3.0 |
|c |3 |64bc741a-0d7c-430d-bfe2-a4838f10acd0|4.0 |
+----+----+------------------------------------+----+
我有一个pyspark版本:
from pyspark.sql import functions as f
pdataDF=dataDF.withColumn("uuid_column",f.expr("uuid()"))
display(pdataDF)
pdataDF.write.mode("overwrite").saveAsTable("tempUuidCheck")
我有一个 Spark 数据框,其中有一列包含生成的 UUID。 但是,每次我对数据帧执行操作或转换时,它都会更改每个阶段的 UUID。
如何只生成一次 UUID 并使 UUID 保持静态。
重现我的问题的一些示例代码如下:
def process(spark: SparkSession): Unit = {
import spark.implicits._
val sc = spark.sparkContext
val sqlContext = spark.sqlContext
sc.setLogLevel("OFF")
// create dataframe
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
df.createOrReplaceTempView("df")
df.show(false)
// register an UDF that creates a random UUID
val generateUUID = udf(() => UUID.randomUUID().toString)
// generate UUID for new column
val dfWithUuid = df.withColumn("new_uuid", generateUUID())
dfWithUuid.show(false)
dfWithUuid.show(false) // uuid is different
// new transformations also change the uuid
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)
}
输出为:
+----+----+
|col1|col2|
+----+----+
|a |1 |
|b |2 |
|c |3 |
+----+----+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |a414e73b-24b8-4f64-8d21-f0bc56d3d290|
|b |2 |f37935e5-0bfc-4863-b6dc-897662307e0a|
|c |3 |e3aaf655-5a48-45fb-8ab5-22f78cdeaf26|
+----+----+------------------------------------+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |1c6597bf-f257-4e5f-be81-34a0efa0f6be|
|b |2 |6efe4453-29a8-4b7f-9fa1-7982d2670bd6|
|c |3 |2f7ddc1c-3e8c-4118-8e2c-8a6f526bee7e|
+----+----+------------------------------------+
+----+----+------------------------------------+----+
|col1|col2|new_uuid |col3|
+----+----+------------------------------------+----+
|a |1 |00b85af8-711e-4b59-82e1-8d8e59d4c512|2.0 |
|b |2 |94c3f2c6-9234-4fb3-b1c4-273a37171131|3.0 |
|c |3 |1059fff2-b8f9-4cec-907d-ea181d5003a2|4.0 |
+----+----+------------------------------------+----+
请注意,每一步的 UUID 都不同。
这是预期的行为。用户定义函数 have to be deterministic:
The user-defined functions must be deterministic. Due to optimization, duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query.
如果你想包含非确定性函数并保留输出,你应该将中间数据写入持久存储并读回。检查点或缓存可能在一些简单的情况下有效,但通常不会可靠。
如果上游进程是确定性的(对于初学者来说有随机播放),您可以尝试使用 rand
function with seed,转换为字节数组并传递给 UUID.nameUUIDFromBytes
。
另请参阅:
注意: SPARK-20586 引入了 deterministic
标志,可以禁用某些优化,但不清楚它在数据为 [=13 时的表现=] 并且执行者丢失。
试试这个:
df.withColumn("XXXID", lit(java.util.UUID.randomUUID().toString))
它的工作方式不同于:
val generateUUID = udf(() => java.util.UUID.randomUUID().toString)
df.withColumn("XXXCID", generateUUID() )
希望对您有所帮助。
帕维尔
这是一个非常古老的问题,但让人们知道什么对我有用。它可能会对某人有所帮助。
您可以使用如下所示的 expr 函数生成唯一的 GUID,该 GUID 在转换时不会改变。
import org.apache.spark.sql.functions._
// create dataframe
val df = spark.createDataset(Array(("a", "1"), ("b", "2"), ("c", "3"))).toDF("col1", "col2")
df.createOrReplaceTempView("df")
df.show(false)
// generate UUID for new column
val dfWithUuid = df.withColumn("new_uuid", expr("uuid()"))
dfWithUuid.show(false)
dfWithUuid.show(false)
// new transformations
val dfWithUuidWithNewCol = dfWithUuid.withColumn("col3", df.col("col2")+1)
dfWithUuidWithNewCol.show(false)
输出如下:
+----+----+
|col1|col2|
+----+----+
|a |1 |
|b |2 |
|c |3 |
+----+----+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b |2 |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c |3 |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+
+----+----+------------------------------------+
|col1|col2|new_uuid |
+----+----+------------------------------------+
|a |1 |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|
|b |2 |43882a79-8e7f-4002-9740-f22bc6b20db5|
|c |3 |64bc741a-0d7c-430d-bfe2-a4838f10acd0|
+----+----+------------------------------------+
+----+----+------------------------------------+----+
|col1|col2|new_uuid |col3|
+----+----+------------------------------------+----+
|a |1 |01c4ef0f-9e9b-458e-b803-5f66df1f7cee|2.0 |
|b |2 |43882a79-8e7f-4002-9740-f22bc6b20db5|3.0 |
|c |3 |64bc741a-0d7c-430d-bfe2-a4838f10acd0|4.0 |
+----+----+------------------------------------+----+
我有一个pyspark版本:
from pyspark.sql import functions as f
pdataDF=dataDF.withColumn("uuid_column",f.expr("uuid()"))
display(pdataDF)
pdataDF.write.mode("overwrite").saveAsTable("tempUuidCheck")