使用最接近的时间戳 pyspark 连接两个数据帧
Join two dataframes using the closest timestamp pyspark
所以我是 pyspark 的新手,但我仍然无法正确创建自己的查询。我尝试用谷歌搜索我的问题,但我只是不明白其中大部分是如何工作的。我不确定我错过了什么。
但无论如何我有以下两个数据框,spark_p:
+--------------------+-----+--------------------+
|__record_timestamp__|cycle| profiles|
+--------------------+-----+--------------------+
| 1651737406300000000| 0|[0, 1, 1, 1, 3, 1...|
| 1651737406300000000| 16|[0, 0, 1, 0, 0, 0...|
| 1651737406300000000| 17|[1, 1, 1, 1, 0, 0...|
| 1651737406300000000| 18|[0, 0, 0, 0, 0, 1...|
| 1651737406300000000| 19|[1, 1, 1, 0, 0, 0...|
+--------------------+-----+--------------------+
和spark_m:
+-------------+--------------------+
| current|__record_timestamp__|
+-------------+--------------------+
| 0.007181627| 1651730407500000000|
| 8.3004625E-4| 1651730464000000000|
| 0.41976404| 1651730507000000000|
|-0.0017322368| 1651732761000000000|
|-2.5260705E-4| 1651732822500000000|
| 2.3460487E-4| 1651732824500000000|
+-------------+--------------------+
我需要向 spark_p 添加一列,其中包含该特定时间戳的当前信息。
所以结果看起来像这样:
+--------------------+-----+--------------------+---------+
|__record_timestamp__|cycle| profiles| current|
+--------------------+-----+--------------------+---------+
| 1651737406300000000| 0|[0, 1, 1, 1, 3, 1...| 0.07|
| 1651737406300000000| 16|[0, 0, 1, 0, 0, 0...| 12|
| 1651737406300000000| 17|[1, 1, 1, 1, 0, 0...| 0.0|
| 1651737406300000000| 18|[0, 0, 0, 0, 0, 1...| 5.235654|
| 1651737406300000000| 19|[1, 1, 1, 0, 0, 0...| 125|
+--------------------+-----+--------------------+---------+
现在时间戳不会完全匹配,但我只需要最接近的时间戳,或者使用之前记录的电流值,两者都可以。我不知道怎么...
当我尝试时:
spark_p.join(spark_m, spark_p.__record_timestamp__ == spark_m.__record_timestamp__, "inner").show()
我刚刚得到:
+--------------------+-----+--------+-----+--------------------+
|__record_timestamp__|cycle|profiles|value|__record_timestamp__|
+--------------------+-----+--------+-----+--------------------+
+--------------------+-----+--------+-----+--------------------+
所以我猜 none 它们完全匹配,但我如何才能获取最接近的值? TIA
此解决方案包含答案:
SPLIT_COUNT = 90
SPLIT_SIZE = 1024
spark_p = data.select("profiles", '__record_timestamp__')
spark_p = spark_p.withColumn("profiles", F.col("profiles").getField("elements") )
slices = [F.slice(F.col('profiles'), i * SPLIT_SIZE + 1, SPLIT_SIZE) for i in range(SPLIT_COUNT)]
spark_p = spark_p.select(F.posexplode(F.array(*slices)), F.col('__record_timestamp__'))
spark_p = spark_p.withColumn("cycle", F.col("pos") )
spark_p = spark_p.withColumn("profiles", F.col("col") )
spark_p = spark_p.drop('pos').drop('col')
spark_m = magnetData.select("value", '__record_timestamp__', )
spark_p = spark_p.withColumn('value', F.lit(None))
spark_m = spark_m.withColumn('profiles', F.lit(None))
spark_m = spark_m.withColumn('cycle', F.lit(None))
final_df = spark_p.unionByName(spark_m)
w = Window.orderBy('__record_timestamp__').rowsBetween(Window.unboundedPreceding, -1)
final_df = final_df.withColumn('value', F.last('value', True).over(w)).filter(~F.isnull('profiles'))
您必须使用 unboundedPreceding 参数创建 window。
所以我是 pyspark 的新手,但我仍然无法正确创建自己的查询。我尝试用谷歌搜索我的问题,但我只是不明白其中大部分是如何工作的。我不确定我错过了什么。
但无论如何我有以下两个数据框,spark_p:
+--------------------+-----+--------------------+
|__record_timestamp__|cycle| profiles|
+--------------------+-----+--------------------+
| 1651737406300000000| 0|[0, 1, 1, 1, 3, 1...|
| 1651737406300000000| 16|[0, 0, 1, 0, 0, 0...|
| 1651737406300000000| 17|[1, 1, 1, 1, 0, 0...|
| 1651737406300000000| 18|[0, 0, 0, 0, 0, 1...|
| 1651737406300000000| 19|[1, 1, 1, 0, 0, 0...|
+--------------------+-----+--------------------+
和spark_m:
+-------------+--------------------+
| current|__record_timestamp__|
+-------------+--------------------+
| 0.007181627| 1651730407500000000|
| 8.3004625E-4| 1651730464000000000|
| 0.41976404| 1651730507000000000|
|-0.0017322368| 1651732761000000000|
|-2.5260705E-4| 1651732822500000000|
| 2.3460487E-4| 1651732824500000000|
+-------------+--------------------+
我需要向 spark_p 添加一列,其中包含该特定时间戳的当前信息。
所以结果看起来像这样:
+--------------------+-----+--------------------+---------+
|__record_timestamp__|cycle| profiles| current|
+--------------------+-----+--------------------+---------+
| 1651737406300000000| 0|[0, 1, 1, 1, 3, 1...| 0.07|
| 1651737406300000000| 16|[0, 0, 1, 0, 0, 0...| 12|
| 1651737406300000000| 17|[1, 1, 1, 1, 0, 0...| 0.0|
| 1651737406300000000| 18|[0, 0, 0, 0, 0, 1...| 5.235654|
| 1651737406300000000| 19|[1, 1, 1, 0, 0, 0...| 125|
+--------------------+-----+--------------------+---------+
现在时间戳不会完全匹配,但我只需要最接近的时间戳,或者使用之前记录的电流值,两者都可以。我不知道怎么...
当我尝试时:
spark_p.join(spark_m, spark_p.__record_timestamp__ == spark_m.__record_timestamp__, "inner").show()
我刚刚得到:
+--------------------+-----+--------+-----+--------------------+
|__record_timestamp__|cycle|profiles|value|__record_timestamp__|
+--------------------+-----+--------+-----+--------------------+
+--------------------+-----+--------+-----+--------------------+
所以我猜 none 它们完全匹配,但我如何才能获取最接近的值? TIA
此解决方案包含答案:
SPLIT_COUNT = 90
SPLIT_SIZE = 1024
spark_p = data.select("profiles", '__record_timestamp__')
spark_p = spark_p.withColumn("profiles", F.col("profiles").getField("elements") )
slices = [F.slice(F.col('profiles'), i * SPLIT_SIZE + 1, SPLIT_SIZE) for i in range(SPLIT_COUNT)]
spark_p = spark_p.select(F.posexplode(F.array(*slices)), F.col('__record_timestamp__'))
spark_p = spark_p.withColumn("cycle", F.col("pos") )
spark_p = spark_p.withColumn("profiles", F.col("col") )
spark_p = spark_p.drop('pos').drop('col')
spark_m = magnetData.select("value", '__record_timestamp__', )
spark_p = spark_p.withColumn('value', F.lit(None))
spark_m = spark_m.withColumn('profiles', F.lit(None))
spark_m = spark_m.withColumn('cycle', F.lit(None))
final_df = spark_p.unionByName(spark_m)
w = Window.orderBy('__record_timestamp__').rowsBetween(Window.unboundedPreceding, -1)
final_df = final_df.withColumn('value', F.last('value', True).over(w)).filter(~F.isnull('profiles'))
您必须使用 unboundedPreceding 参数创建 window。