Spark(使用pyspark)在一个数据帧(结构化流)中使用值来查询静态数据帧并将第二个df的行与第一个df合并
Spark (using pyspark) use value in one dataframe (structured streaming) to query static dataframe and merge row from second df with first one
我有一个我想要的结构化流数据帧,对于每一行,取一列中的值(在本例中是时间戳,如 1525670700),并使用该值查询另一个静态数据帧以获得最接近的时间戳value 并将从该查询返回的一行合并到结构化流数据帧,例如
my_row_to_merge_df = weather_df.filter(weather_df.timestamp_unix > 1525670700).sort(col('timestamp_unix').asc()).limit(1).show()
我想做类似的事情:
joined_df = streaming_df.merge(function_to_return_row_from_other_df(col('timestamp')))
在这种情况下使用连接的问题是时间戳可能不完全匹配,但如果它们相差几秒甚至几分钟也没关系。
因此,我不确定我可以做什么样的操作才能得到这个结果。
您可以将时间戳四舍五入到您需要的显着性水平:
# Assuming timestamp is in seconds
timestamp_rounded = int(timestamp - (timestamp % int(minutes * 60)))
其中 "minutes" 是您要四舍五入的级别。例如,如果您选择分钟 = 5,您会将时间戳舍入(向下)为每 5 分钟一次。
您可以添加新列,如下所示:
我有一个我想要的结构化流数据帧,对于每一行,取一列中的值(在本例中是时间戳,如 1525670700),并使用该值查询另一个静态数据帧以获得最接近的时间戳value 并将从该查询返回的一行合并到结构化流数据帧,例如
my_row_to_merge_df = weather_df.filter(weather_df.timestamp_unix > 1525670700).sort(col('timestamp_unix').asc()).limit(1).show()
我想做类似的事情:
joined_df = streaming_df.merge(function_to_return_row_from_other_df(col('timestamp')))
在这种情况下使用连接的问题是时间戳可能不完全匹配,但如果它们相差几秒甚至几分钟也没关系。
因此,我不确定我可以做什么样的操作才能得到这个结果。
您可以将时间戳四舍五入到您需要的显着性水平:
# Assuming timestamp is in seconds
timestamp_rounded = int(timestamp - (timestamp % int(minutes * 60)))
其中 "minutes" 是您要四舍五入的级别。例如,如果您选择分钟 = 5,您会将时间戳舍入(向下)为每 5 分钟一次。
您可以添加新列,如下所示: