使用spark高效过滤有序文件
Efficiently filter ordered file with spark
想象一下,我有一个很大的格式日志文件:
Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text
日志文件已按时间戳排序。
如果我用 spark 作为数据帧读取文件,然后过滤 t1 和 t2 之间的时间戳,
然后他检查每条记录的时间戳是否在 t1 和 t2 之间,这需要很多时间。
但是,有没有一种方法可以告诉 spark df 已经订购,然后它会知道它只需要查找第一个和最后一个时间戳,以及 return 中的所有行两者之间,哪个会快很多?
不行,spark中没有这个选项。但是有不同的解决方案,广泛用于存储事件或日志的系统中,称为分区。如果您有很多天的记录,请添加一个只有一天的新列:
df.withColumn("day", df.timestamp.cast("date"))
然后使用 partitionedBy:
保存此文件
df_with_day.write.partitionBy("day").csv("partitioned")
这将为每一天创建目录(列日不会保存在重写的文件中),因此下一次使用适当的 where
过滤的查询将忽略不在范围内的目录中的文件:
new_df = spark.read.csv("partitioned")
new_df.where(new.day.between("2016-11-30", "2016-12-10")).show()
想象一下,我有一个很大的格式日志文件:
Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text
Timestamp, text
日志文件已按时间戳排序。 如果我用 spark 作为数据帧读取文件,然后过滤 t1 和 t2 之间的时间戳, 然后他检查每条记录的时间戳是否在 t1 和 t2 之间,这需要很多时间。
但是,有没有一种方法可以告诉 spark df 已经订购,然后它会知道它只需要查找第一个和最后一个时间戳,以及 return 中的所有行两者之间,哪个会快很多?
不行,spark中没有这个选项。但是有不同的解决方案,广泛用于存储事件或日志的系统中,称为分区。如果您有很多天的记录,请添加一个只有一天的新列:
df.withColumn("day", df.timestamp.cast("date"))
然后使用 partitionedBy:
保存此文件df_with_day.write.partitionBy("day").csv("partitioned")
这将为每一天创建目录(列日不会保存在重写的文件中),因此下一次使用适当的 where
过滤的查询将忽略不在范围内的目录中的文件:
new_df = spark.read.csv("partitioned")
new_df.where(new.day.between("2016-11-30", "2016-12-10")).show()