按时差过滤pyspark

Filter pyspark by time difference

我在 pyspark 中有一个如下所示的数据框:

+----------+-------------------+-------+-----------------------+-----------------------+--------+
|Session_Id|Instance_Id        |Actions|Start_Date             |End_Date               |Duration|
+----------+-------------------+-------+-----------------------+-----------------------+--------+
|14252203  |i-051fc2d21fbe001e3|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|43024091  |i-051fc2d21fbe001e3|2      |2019-12-17 01:08:00.000|2019-12-17 01:08:00.000|0       |
|50961995  |i-0c733c7e356bc1615|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|56308963  |i-0c733c7e356bc1615|2      |2019-12-17 01:08:00.000|2019-12-17 01:08:00.000|0       |
|60120472  |i-0c733c7e356bc1615|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
|69132492  |i-051fc2d21fbe001e3|2      |2019-12-17 01:07:30.000|2019-12-17 01:07:30.000|0       |
+----------+-------------------+-------+-----------------------+-----------------------+--------+

我正在尝试使用此过滤任何太新的行:

now = datetime.datetime.now()

filtered = grouped.filter(f.abs(f.unix_timestamp(now) - f.unix_timestamp(datetime.datetime.strptime(f.col('End_Date')[:-4], '%Y-%m-%d %H:%M:%S'))) > 100)

End_Date 转换为时间戳并计算从现在到 End_Date 的差异并过滤小于 100 秒的任何内容。我从

得到的

每次我运行这个,我得到这个错误:

TypeError: Invalid argument, not a string or column: 2019-12-19 18:55:13.268489 of type <type 'datetime.datetime'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

如何通过比较时间戳进行过滤?

我认为您混淆了 Python 函数和 Spark。 unix_timestamp 函数需要字符串或 Column 对象,但您传递的是 Python datetime 对象,这就是您收到该错误的原因。

而是使用 Spark 内置函数:current_date 为您提供具有当前日期值的列,to_dateEnd_Date 列转换为日期。

这应该适合你:

filtered = grouped.filter(abs(unix_timestamp(current_date()) - unix_timestamp(to_date(col('End_Date'), 'yyyy-MM-dd HH:mm:ss'))) > 100)