过滤并保存数据帧的前 X 行
filter and save first X lines of a dataframe
我正在使用 pySpark 读取和计算数据框的统计数据。
数据框看起来像:
TRANSACTION_URL START_TIME END_TIME SIZE FLAG COL6 COL7 ...
www.google.com 20170113093210 20170113093210 150 1 ... ...
www.cnet.com 20170113114510 20170113093210 150 2 ... ...
我正在向数据框添加一个新的 timePeriod
列,添加后,我想保存前 50K 条记录 timePeriod
匹配某些 pre-defined 值。
我的目的是将这些行保存到 CSV 数据框 header。
我知道这应该是 col
和 write.csv
的组合,但我不确定如何根据我的意图正确使用它们。
我当前的代码是:
encodeUDF = udf(encode_time, StringType())
log_df = log_df.withColumn('timePeriod', encodeUDF(col('START_TIME')))
添加该列后,我猜我应该使用类似的东西:
log_df.select(col('timePeriod') == 'Weekday').write.csv(....)
有人可以帮我填补这里的空白,以符合我的意图吗?
使用filter()
和limit()
方法解决如下:
new_log_df.filter(col('timePeriod') == '20161206, Morning').limit(50).write.\
format('csv').option("header", "true").save("..Path..")
unix_timestamp
和 date_format
是有用的方法,因为 START_TIME
不是时间戳类型。
dfWithDayNum = log_df.withColumn("timePeriod", date_format(
unix_timestamp(col("START_TIME"), "yyyyMMddHHmmss").cast(TimestampType), "u")
)
timePeriod
将有 星期几(1 = 星期一,...,7 = 星期日)
dfWithDayNum
.filter(col("timePeriod") < 6) //to filter weekday
.limit(50000) //X lines
.write.format("csv")
.option("header", "true")
.csv("location/to/save/df")
我正在使用 pySpark 读取和计算数据框的统计数据。
数据框看起来像:
TRANSACTION_URL START_TIME END_TIME SIZE FLAG COL6 COL7 ...
www.google.com 20170113093210 20170113093210 150 1 ... ...
www.cnet.com 20170113114510 20170113093210 150 2 ... ...
我正在向数据框添加一个新的 timePeriod
列,添加后,我想保存前 50K 条记录 timePeriod
匹配某些 pre-defined 值。
我的目的是将这些行保存到 CSV 数据框 header。
我知道这应该是 col
和 write.csv
的组合,但我不确定如何根据我的意图正确使用它们。
我当前的代码是:
encodeUDF = udf(encode_time, StringType())
log_df = log_df.withColumn('timePeriod', encodeUDF(col('START_TIME')))
添加该列后,我猜我应该使用类似的东西:
log_df.select(col('timePeriod') == 'Weekday').write.csv(....)
有人可以帮我填补这里的空白,以符合我的意图吗?
使用filter()
和limit()
方法解决如下:
new_log_df.filter(col('timePeriod') == '20161206, Morning').limit(50).write.\
format('csv').option("header", "true").save("..Path..")
unix_timestamp
和 date_format
是有用的方法,因为 START_TIME
不是时间戳类型。
dfWithDayNum = log_df.withColumn("timePeriod", date_format(
unix_timestamp(col("START_TIME"), "yyyyMMddHHmmss").cast(TimestampType), "u")
)
timePeriod
将有 星期几(1 = 星期一,...,7 = 星期日)
dfWithDayNum
.filter(col("timePeriod") < 6) //to filter weekday
.limit(50000) //X lines
.write.format("csv")
.option("header", "true")
.csv("location/to/save/df")