PySpark:(广播)在最近的 datetimes/unix 上加入两个数据集
PySpark: (broadcast) joining two datasets on closest datetimes/unix
我正在使用 PySpark 并且即将放弃我的问题。
我有两个数据集:一个非常非常大的数据集(A 集)和一个相当小的数据集(B 集)。
它们的形式是:
Data set A:
variable | timestampA
---------------------------------
x | 2015-01-01 09:29:21
y | 2015-01-01 12:01:57
Data set B:
different information | timestampB
-------------------------------------------
info a | 2015-01-01 09:30:00
info b | 2015-01-01 09:30:00
info a | 2015-01-01 12:00:00
info b | 2015-01-01 12:00:00
A 有很多行,每行都有不同的时间戳。 B每隔几分钟就有一个时间戳。这里的主要问题是,两个数据集中没有匹配的确切时间戳。
我的目标是在最近的时间戳上加入数据集。由于我想以特定方式加入,因此出现了另一个问题。
对于 A 中的每个条目,我想在复制 A 中的条目时映射最接近时间戳的全部信息。因此,结果应如下所示:
Final data set
variable | timestampA | information | timestampB
--------------------------------------------------------------------------
x | 2015-01-01 09:29:21 | info a | 2015-01-01 09:30:00
x | 2015-01-01 09:29:21 | info b | 2015-01-01 09:30:00
y | 2015-01-01 12:01:57 | info a | 2015-01-01 12:00:00
y | 2015-01-01 12:01:57 | info b | 2015-01-01 12:00:00
我对 PySpark(以及 Whosebug)还很陌生。我想我可能需要使用 window 函数 and/or 广播连接,但我真的没有必要开始,希望得到任何帮助。谢谢!
你可以使用 broadcast
来避免随机播放。
如果理解正确,您在 set_B
中有时间戳,这些时间戳是确定的时间间隔的结果?如果是这样,您可以执行以下操作:
from pyspark.sql import functions as F
# assuming 5 minutes is your interval in set_B
interval = 'INTERVAL {} SECONDS'.format(5 * 60 / 2)
res = set_A.join(F.broadcast(set_B), (set_A['timestampA'] > (set_B['timestampB'] - F.expr(interval))) & (set_A['timestampA'] <= (set_B['timestampB'] + F.expr(interval))))
输出:
+--------+-------------------+------+-------------------+
|variable| timestampA| info| timestampB|
+--------+-------------------+------+-------------------+
| x|2015-01-01 09:29:21|info a|2015-01-01 09:30:00|
| x|2015-01-01 09:29:21|info b|2015-01-01 09:30:00|
| y|2015-01-01 12:01:57|info a|2015-01-01 12:00:00|
| y|2015-01-01 12:01:57|info b|2015-01-01 12:00:00|
+--------+-------------------+------+-------------------+
如果您没有确定的间隔,那么只有交叉连接然后找到 min(timestampA - timestampB)
间隔才能解决问题。您可以使用 window 函数和 row_number
函数来做到这一点,如下所示:
w = Window.partitionBy('variable', 'info').orderBy(F.abs(F.col('timestampA').cast('int') - F.col('timestampB').cast('int')))
res = res.withColumn('rn', F.row_number().over(w)).filter('rn = 1').drop('rn')
我正在使用 PySpark 并且即将放弃我的问题。 我有两个数据集:一个非常非常大的数据集(A 集)和一个相当小的数据集(B 集)。 它们的形式是:
Data set A:
variable | timestampA
---------------------------------
x | 2015-01-01 09:29:21
y | 2015-01-01 12:01:57
Data set B:
different information | timestampB
-------------------------------------------
info a | 2015-01-01 09:30:00
info b | 2015-01-01 09:30:00
info a | 2015-01-01 12:00:00
info b | 2015-01-01 12:00:00
A 有很多行,每行都有不同的时间戳。 B每隔几分钟就有一个时间戳。这里的主要问题是,两个数据集中没有匹配的确切时间戳。
我的目标是在最近的时间戳上加入数据集。由于我想以特定方式加入,因此出现了另一个问题。 对于 A 中的每个条目,我想在复制 A 中的条目时映射最接近时间戳的全部信息。因此,结果应如下所示:
Final data set
variable | timestampA | information | timestampB
--------------------------------------------------------------------------
x | 2015-01-01 09:29:21 | info a | 2015-01-01 09:30:00
x | 2015-01-01 09:29:21 | info b | 2015-01-01 09:30:00
y | 2015-01-01 12:01:57 | info a | 2015-01-01 12:00:00
y | 2015-01-01 12:01:57 | info b | 2015-01-01 12:00:00
我对 PySpark(以及 Whosebug)还很陌生。我想我可能需要使用 window 函数 and/or 广播连接,但我真的没有必要开始,希望得到任何帮助。谢谢!
你可以使用 broadcast
来避免随机播放。
如果理解正确,您在 set_B
中有时间戳,这些时间戳是确定的时间间隔的结果?如果是这样,您可以执行以下操作:
from pyspark.sql import functions as F
# assuming 5 minutes is your interval in set_B
interval = 'INTERVAL {} SECONDS'.format(5 * 60 / 2)
res = set_A.join(F.broadcast(set_B), (set_A['timestampA'] > (set_B['timestampB'] - F.expr(interval))) & (set_A['timestampA'] <= (set_B['timestampB'] + F.expr(interval))))
输出:
+--------+-------------------+------+-------------------+
|variable| timestampA| info| timestampB|
+--------+-------------------+------+-------------------+
| x|2015-01-01 09:29:21|info a|2015-01-01 09:30:00|
| x|2015-01-01 09:29:21|info b|2015-01-01 09:30:00|
| y|2015-01-01 12:01:57|info a|2015-01-01 12:00:00|
| y|2015-01-01 12:01:57|info b|2015-01-01 12:00:00|
+--------+-------------------+------+-------------------+
如果您没有确定的间隔,那么只有交叉连接然后找到 min(timestampA - timestampB)
间隔才能解决问题。您可以使用 window 函数和 row_number
函数来做到这一点,如下所示:
w = Window.partitionBy('variable', 'info').orderBy(F.abs(F.col('timestampA').cast('int') - F.col('timestampB').cast('int')))
res = res.withColumn('rn', F.row_number().over(w)).filter('rn = 1').drop('rn')