PySpark 中的列过滤
Column filtering in PySpark
我有一个从 Hive table 加载的数据帧 df
,它有一个时间戳列,比如 ts
,字符串类型的格式为 dd-MMM-yy hh.mm.ss.MS a
(转换为 python datetime 库,这是%d-%b-%y %I.%M.%S.%f %p
)。
现在我想从过去五分钟的数据框中过滤行:
only_last_5_minutes = df.filter(
datetime.strptime(df.ts, '%d-%b-%y %I.%M.%S.%f %p') > datetime.now() - timedelta(minutes=5)
)
但是,这不起作用,我收到了这条消息
TypeError: strptime() argument 1 must be string, not Column
看来我对列操作的应用有误,在我看来我必须创建一个 lambda 函数来过滤满足所需条件的每一列,但作为 Python 和 lambda 表达式的新手特别是,我不知道如何正确创建我的过滤器。请指教
P.S。
我更喜欢将我的过滤器表达为 Python 本机(或 SparkSQL)而不是 Hive sql 查询表达式 'WHERE'.
中的过滤器
首选:
df = sqlContext.sql("SELECT * FROM my_table")
df.filter( // filter here)
不喜欢:
df = sqlContext.sql("SELECT * FROM my_table WHERE...")
火花 >= 1.5
从 Spark 1.5 开始,您可以按如下方式解析日期字符串:
from pyspark.sql.functions import expr, from_unixtime, lit, unix_timestamp
from pyspark.sql.types import TimestampType
parsed_df = df.select((from_unixtime(unix_timestamp(
# Note: am-pm: pattern length should be 1 for Spark >= 3.0
df.datetime, "dd-MMM-yy h.mm.ss.SSSSSS a"
))).cast(TimestampType()).alias("datetime"))
parsed_df.where(col("datetime") >= lit(now) - expr("INTERVAL 5 minutes"))
然后应用间隔:
from pyspark.sql.functions import current_timestamp, expr
Spark < 1.5
可以使用用户定义的函数。
from datetime import datetime, timedelta
from pyspark.sql.types import BooleanType, TimestampType
from pyspark.sql.functions import udf, col
def in_last_5_minutes(now):
def _in_last_5_minutes(then):
then_parsed = datetime.strptime(then, '%d-%b-%y %I.%M.%S.%f %p')
return then_parsed > now - timedelta(minutes=5)
return udf(_in_last_5_minutes, BooleanType())
使用一些虚拟数据:
df = sqlContext.createDataFrame([
(1, '14-Jul-15 11.34.29.000000 AM'),
(2, '14-Jul-15 11.34.27.000000 AM'),
(3, '14-Jul-15 11.32.11.000000 AM'),
(4, '14-Jul-15 11.29.00.000000 AM'),
(5, '14-Jul-15 11.28.29.000000 AM')
], ('id', 'datetime'))
now = datetime(2015, 7, 14, 11, 35)
df.where(in_last_5_minutes(now)(col("datetime"))).show()
正如预期的那样,我们只得到 3 个条目:
+--+--------------------+
|id| datetime|
+--+--------------------+
| 1|14-Jul-15 11.34.2...|
| 2|14-Jul-15 11.34.2...|
| 3|14-Jul-15 11.32.1...|
+--+--------------------+
重新解析日期时间字符串效率很低,因此您可以考虑存储 TimestampType
。
def parse_dt():
def _parse(dt):
return datetime.strptime(dt, '%d-%b-%y %I.%M.%S.%f %p')
return udf(_parse, TimestampType())
df_with_timestamp = df.withColumn("timestamp", parse_dt()(df.datetime))
def in_last_5_minutes(now):
def _in_last_5_minutes(then):
return then > now - timedelta(minutes=5)
return udf(_in_last_5_minutes, BooleanType())
df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp")))
结果:
+--+--------------------+--------------------+
|id| datetime| timestamp|
+--+--------------------+--------------------+
| 1|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 2|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 3|14-Jul-15 11.32.1...|2015-07-14 11:32:...|
+--+--------------------+--------------------+
终于可以使用带时间戳的原始 SQL 查询了:
query = """SELECT * FROM df
WHERE unix_timestamp(datetime, 'dd-MMM-yy HH.mm.ss.SSSSSS a') > {0}
""".format(time.mktime((now - timedelta(minutes=5)).timetuple()))
sqlContext.sql(query)
同上,解析一次日期字符串效率会更高。
如果列已经是 timestamp
,则可以使用 datetime
文字:
from pyspark.sql.functions import lit
df_with_timestamp.where(
df_with_timestamp.timestamp > lit(now - timedelta(minutes=5)))
from pyspark.sql.functions import *
df.withColumn("seconds_from_now", current_timestamp() - col("ts").cast("long"))
df = df.filter(df.seconds_from_now <= 5*60).drop("seconds_from_now")
df 是包含最后五分钟结果的结果数据框。
我有一个从 Hive table 加载的数据帧 df
,它有一个时间戳列,比如 ts
,字符串类型的格式为 dd-MMM-yy hh.mm.ss.MS a
(转换为 python datetime 库,这是%d-%b-%y %I.%M.%S.%f %p
)。
现在我想从过去五分钟的数据框中过滤行:
only_last_5_minutes = df.filter(
datetime.strptime(df.ts, '%d-%b-%y %I.%M.%S.%f %p') > datetime.now() - timedelta(minutes=5)
)
但是,这不起作用,我收到了这条消息
TypeError: strptime() argument 1 must be string, not Column
看来我对列操作的应用有误,在我看来我必须创建一个 lambda 函数来过滤满足所需条件的每一列,但作为 Python 和 lambda 表达式的新手特别是,我不知道如何正确创建我的过滤器。请指教
P.S。 我更喜欢将我的过滤器表达为 Python 本机(或 SparkSQL)而不是 Hive sql 查询表达式 'WHERE'.
中的过滤器首选:
df = sqlContext.sql("SELECT * FROM my_table")
df.filter( // filter here)
不喜欢:
df = sqlContext.sql("SELECT * FROM my_table WHERE...")
火花 >= 1.5
从 Spark 1.5 开始,您可以按如下方式解析日期字符串:
from pyspark.sql.functions import expr, from_unixtime, lit, unix_timestamp
from pyspark.sql.types import TimestampType
parsed_df = df.select((from_unixtime(unix_timestamp(
# Note: am-pm: pattern length should be 1 for Spark >= 3.0
df.datetime, "dd-MMM-yy h.mm.ss.SSSSSS a"
))).cast(TimestampType()).alias("datetime"))
parsed_df.where(col("datetime") >= lit(now) - expr("INTERVAL 5 minutes"))
然后应用间隔:
from pyspark.sql.functions import current_timestamp, expr
Spark < 1.5
可以使用用户定义的函数。
from datetime import datetime, timedelta
from pyspark.sql.types import BooleanType, TimestampType
from pyspark.sql.functions import udf, col
def in_last_5_minutes(now):
def _in_last_5_minutes(then):
then_parsed = datetime.strptime(then, '%d-%b-%y %I.%M.%S.%f %p')
return then_parsed > now - timedelta(minutes=5)
return udf(_in_last_5_minutes, BooleanType())
使用一些虚拟数据:
df = sqlContext.createDataFrame([
(1, '14-Jul-15 11.34.29.000000 AM'),
(2, '14-Jul-15 11.34.27.000000 AM'),
(3, '14-Jul-15 11.32.11.000000 AM'),
(4, '14-Jul-15 11.29.00.000000 AM'),
(5, '14-Jul-15 11.28.29.000000 AM')
], ('id', 'datetime'))
now = datetime(2015, 7, 14, 11, 35)
df.where(in_last_5_minutes(now)(col("datetime"))).show()
正如预期的那样,我们只得到 3 个条目:
+--+--------------------+
|id| datetime|
+--+--------------------+
| 1|14-Jul-15 11.34.2...|
| 2|14-Jul-15 11.34.2...|
| 3|14-Jul-15 11.32.1...|
+--+--------------------+
重新解析日期时间字符串效率很低,因此您可以考虑存储 TimestampType
。
def parse_dt():
def _parse(dt):
return datetime.strptime(dt, '%d-%b-%y %I.%M.%S.%f %p')
return udf(_parse, TimestampType())
df_with_timestamp = df.withColumn("timestamp", parse_dt()(df.datetime))
def in_last_5_minutes(now):
def _in_last_5_minutes(then):
return then > now - timedelta(minutes=5)
return udf(_in_last_5_minutes, BooleanType())
df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp")))
结果:
+--+--------------------+--------------------+
|id| datetime| timestamp|
+--+--------------------+--------------------+
| 1|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 2|14-Jul-15 11.34.2...|2015-07-14 11:34:...|
| 3|14-Jul-15 11.32.1...|2015-07-14 11:32:...|
+--+--------------------+--------------------+
终于可以使用带时间戳的原始 SQL 查询了:
query = """SELECT * FROM df
WHERE unix_timestamp(datetime, 'dd-MMM-yy HH.mm.ss.SSSSSS a') > {0}
""".format(time.mktime((now - timedelta(minutes=5)).timetuple()))
sqlContext.sql(query)
同上,解析一次日期字符串效率会更高。
如果列已经是 timestamp
,则可以使用 datetime
文字:
from pyspark.sql.functions import lit
df_with_timestamp.where(
df_with_timestamp.timestamp > lit(now - timedelta(minutes=5)))
from pyspark.sql.functions import *
df.withColumn("seconds_from_now", current_timestamp() - col("ts").cast("long"))
df = df.filter(df.seconds_from_now <= 5*60).drop("seconds_from_now")
df 是包含最后五分钟结果的结果数据框。