pyspark,获取第一列值等于 id 且第二列值介于两个值之间的行,对数据框中的每一行执行此操作
pyspark, get rows where first column value equals id and second column value is between two values, do this for each row in a dataframe
所以我有一个像这样的 pyspark 数据框,我们称它为数据框 a:
+-------------------+---------------+----------------+
| reg| val1| val2 |
+-------------------+---------------+----------------+
| N110WA| 1590030660| 1590038340000|
| N876LF| 1590037200| 1590038880000|
| N135MH| 1590039060| 1590040080000|
还有一个类似的,我们称它为数据框 b:
+-----+-------------+-----+-----+---------+----------+---+----+
| reg| postime| alt| galt| lat| long|spd| vsi|
+-----+-------------+-----+-----+---------+----------+---+----+
|XY679|1590070078549| 50| 130|18.567169|-69.986343|132|1152|
|HI949|1590070091707| 375| 455| 18.5594|-69.987804|148|1344|
|JX784|1590070110666| 825| 905|18.544968|-69.990414|170|1216|
有没有什么方法可以创建一个 numpy 数组或 pyspark 数据帧,其中对于数据帧 a 中的每一行,数据帧 b 中在 val 1 和 val 2 之间具有相同 reg 和 postime 的所有行都包括在内?
是的,假设 df_a
和 df_b
都是 pyspark 数据帧,您可以在 pyspark 中使用内部连接:
delta = val
df = df_a.join(df_b, [
df_a.res == df_b.res,
df_a.posttime <= df_b.val1 + delta,
df_a.posttime >= df_b.val2 - delta
], "inner")
将筛选出结果以仅包含指定的结果
您可以尝试以下解决方案 - 让我们知道是否有效或需要其他什么?
为了展示可行的解决方案,我对估算做了一些修改--
在这里输入
from pyspark.sql import functions as F
df_a = spark.createDataFrame([('N110WA',1590030660,1590038340000), ('N110WA',1590070078549,1590070078559)],[ "reg","val1","val2"])
df_b = spark.createDataFrame([('N110WA',1590070078549)],[ "reg","postime"])
df_a.show()
df_a
+------+-------------+-------------+
| reg| val1| val2|
+------+-------------+-------------+
|N110WA| 1590030660|1590038340000|
|N110WA|1590070078549|1590070078559|
+------+-------------+-------------+
df_b
+------+-------------+
| reg| postime|
+------+-------------+
|N110WA|1590070078549|
+------+-------------+
解决方法在这里
from pyspark.sql import types as T
from pyspark.sql import functions as F
#df_a = df_a.join(df_b,'reg','left')
df_a = df_a.withColumn('condition_col', F.when(((F.col('postime') >= F.col('val1')) & (F.col('postime') <= F.col('val2'))),'1').otherwise('0'))
df_a = df_a.filter(F.col('condition_col') == 1).drop('condition_col')
df_a.show()
最终输出
+------+-------------+-------------+-------------+
| reg| val1| val2| postime|
+------+-------------+-------------+-------------+
|N110WA|1590070078549|1590070078559|1590070078549|
+------+-------------+-------------+-------------+
所以我有一个像这样的 pyspark 数据框,我们称它为数据框 a:
+-------------------+---------------+----------------+
| reg| val1| val2 |
+-------------------+---------------+----------------+
| N110WA| 1590030660| 1590038340000|
| N876LF| 1590037200| 1590038880000|
| N135MH| 1590039060| 1590040080000|
还有一个类似的,我们称它为数据框 b:
+-----+-------------+-----+-----+---------+----------+---+----+
| reg| postime| alt| galt| lat| long|spd| vsi|
+-----+-------------+-----+-----+---------+----------+---+----+
|XY679|1590070078549| 50| 130|18.567169|-69.986343|132|1152|
|HI949|1590070091707| 375| 455| 18.5594|-69.987804|148|1344|
|JX784|1590070110666| 825| 905|18.544968|-69.990414|170|1216|
有没有什么方法可以创建一个 numpy 数组或 pyspark 数据帧,其中对于数据帧 a 中的每一行,数据帧 b 中在 val 1 和 val 2 之间具有相同 reg 和 postime 的所有行都包括在内?
是的,假设 df_a
和 df_b
都是 pyspark 数据帧,您可以在 pyspark 中使用内部连接:
delta = val
df = df_a.join(df_b, [
df_a.res == df_b.res,
df_a.posttime <= df_b.val1 + delta,
df_a.posttime >= df_b.val2 - delta
], "inner")
将筛选出结果以仅包含指定的结果
您可以尝试以下解决方案 - 让我们知道是否有效或需要其他什么?
为了展示可行的解决方案,我对估算做了一些修改--
在这里输入
from pyspark.sql import functions as F
df_a = spark.createDataFrame([('N110WA',1590030660,1590038340000), ('N110WA',1590070078549,1590070078559)],[ "reg","val1","val2"])
df_b = spark.createDataFrame([('N110WA',1590070078549)],[ "reg","postime"])
df_a.show()
df_a
+------+-------------+-------------+
| reg| val1| val2|
+------+-------------+-------------+
|N110WA| 1590030660|1590038340000|
|N110WA|1590070078549|1590070078559|
+------+-------------+-------------+
df_b
+------+-------------+
| reg| postime|
+------+-------------+
|N110WA|1590070078549|
+------+-------------+
解决方法在这里
from pyspark.sql import types as T
from pyspark.sql import functions as F
#df_a = df_a.join(df_b,'reg','left')
df_a = df_a.withColumn('condition_col', F.when(((F.col('postime') >= F.col('val1')) & (F.col('postime') <= F.col('val2'))),'1').otherwise('0'))
df_a = df_a.filter(F.col('condition_col') == 1).drop('condition_col')
df_a.show()
最终输出
+------+-------------+-------------+-------------+
| reg| val1| val2| postime|
+------+-------------+-------------+-------------+
|N110WA|1590070078549|1590070078559|1590070078549|
+------+-------------+-------------+-------------+