Spark / Scala:用最后的良好观察填充 nan
Spark / Scala: fill nan with last good observation
我正在使用 spark 2.0.1 并想用列中最后一个已知值填充 nan 值。
我能找到的关于 spark 的唯一参考 or ,它似乎使用了 RDD。
我宁愿留在数据框/数据集世界中,并可能处理多个 nan 值。
这可能吗?
我的假设是数据(最初从 CSV 文件加载)按时间排序,并且此顺序保留在分布式设置中,例如按关闭/最后已知值填充是正确的。可能用以前的值填充对于大多数记录来说,连续没有 2 个或更多 nan 记录就足够了。这真的成立吗?
关键是
myDf.sort("foo").show
会破坏任何订单,例如所有 null
值将排在第一位。
一个小例子:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
结果
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
我想用最后一个已知值修复该值。我怎样才能做到这一点?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
编辑
在我的例子中,填充上面一行中的值就足够了,因为只有非常有限的错误值。
edit2
我尝试添加一个索引列
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())
然后填上最后一个值。
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show
但是上面写着以下警告:
没有为 Window 操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降。 我如何引入有意义的分区?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+
这是一个中间答案。但是,它不是很好,因为没有分区/只使用了一个分区。我还在寻找更好的方法来解决问题
df
.withColumn("rowId", monotonically_increasing_id())
.withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
.withColumn("columnWithNullReplaced",
when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")
)
编辑
我正在努力使用 mapPartitionsWithIndex
构建更好的解决方案
https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 尚未完成。
edit2
添加
if (i == 0) {
lastNotNullRow = toCarryBd.value.get(i + 1).get
} else {
lastNotNullRow = toCarryBd.value.get(i - 1).get
}
会得到想要的结果。
//用最后一个未知的空值填充空字段
我试过了,这确实有效!!
val dftxt1 = spark.read.option("header","true").option("sep","\t").csv("/sdata/ph/com/r/ph_com_r_ita_javelin/inbound/abc.txt").toDF("line_name", "merge_key", "line_id")
dftxt2.select("line_name","merge_key","line_id").write.mode("overwrite").insertInto("dbname.tablename")
val df = spark.sql("select * from dbname.tablename")
val Df1 = df.withColumn("rowId", monotonically_increasing_id())
import org.apache.spark.sql.expressions.Window
val partitionWindow = Window.orderBy("rowId")
val Df2 = Df1.withColumn("line_id", last("line_id", true) over (partitionWindow))
Df2.show
我正在使用 spark 2.0.1 并想用列中最后一个已知值填充 nan 值。
我能找到的关于 spark 的唯一参考
我宁愿留在数据框/数据集世界中,并可能处理多个 nan 值。 这可能吗?
我的假设是数据(最初从 CSV 文件加载)按时间排序,并且此顺序保留在分布式设置中,例如按关闭/最后已知值填充是正确的。可能用以前的值填充对于大多数记录来说,连续没有 2 个或更多 nan 记录就足够了。这真的成立吗? 关键是
myDf.sort("foo").show
会破坏任何订单,例如所有 null
值将排在第一位。
一个小例子:
import java.sql.{ Date, Timestamp }
case class FooBar(foo:Date, bar:String)
val myDf = Seq(("2016-01-01","first"),("2016-01-02","second"),("2016-wrongFormat","noValidFormat"), ("2016-01-04","lastAssumingSameDate"))
.toDF("foo","bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
结果
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
| null| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
我想用最后一个已知值修复该值。我怎样才能做到这一点?
+----------+--------------------+
| foo| bar|
+----------+--------------------+
|2016-01-01| first|
|2016-01-02| second|
|2016-01-02| noValidFormat|
|2016-01-04|lastAssumingSameDate|
+----------+--------------------+
编辑
在我的例子中,填充上面一行中的值就足够了,因为只有非常有限的错误值。
edit2
我尝试添加一个索引列
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), ("2016-wrongFormat", "noValidFormat"), ("2016-01-04", "lastAssumingSameDate"))
.toDF("foo", "bar")
.withColumn("foo", 'foo.cast("Date"))
.as[FooBar]
.withColumn("rowId", monotonically_increasing_id())
然后填上最后一个值。
myDf.withColumn("fooLag", lag('foo, 1) over Window.orderBy('rowId)).show
但是上面写着以下警告: 没有为 Window 操作定义分区!将所有数据移动到单个分区,这会导致严重的性能下降。 我如何引入有意义的分区?
+----------+--------------------+-----+----------+
| foo| bar|rowId| fooLag|
+----------+--------------------+-----+----------+
|2016-01-01| first| 0| null|
|2016-01-02| second| 1|2016-01-01|
| null| noValidFormat| 2|2016-01-02|
|2016-01-04|lastAssumingSameDate| 3| null|
+----------+--------------------+-----+----------+
这是一个中间答案。但是,它不是很好,因为没有分区/只使用了一个分区。我还在寻找更好的方法来解决问题
df
.withColumn("rowId", monotonically_increasing_id())
.withColumn("replacement", lag('columnWithNull, 1) over Window.orderBy('rowId))
.withColumn("columnWithNullReplaced",
when($"columnWithNull" isNull, "replacement").otherwise($"columnWithNull")
)
编辑
我正在努力使用 mapPartitionsWithIndex
构建更好的解决方案
https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2 尚未完成。
edit2
添加
if (i == 0) {
lastNotNullRow = toCarryBd.value.get(i + 1).get
} else {
lastNotNullRow = toCarryBd.value.get(i - 1).get
}
会得到想要的结果。
//用最后一个未知的空值填充空字段 我试过了,这确实有效!!
val dftxt1 = spark.read.option("header","true").option("sep","\t").csv("/sdata/ph/com/r/ph_com_r_ita_javelin/inbound/abc.txt").toDF("line_name", "merge_key", "line_id")
dftxt2.select("line_name","merge_key","line_id").write.mode("overwrite").insertInto("dbname.tablename")
val df = spark.sql("select * from dbname.tablename")
val Df1 = df.withColumn("rowId", monotonically_increasing_id())
import org.apache.spark.sql.expressions.Window
val partitionWindow = Window.orderBy("rowId")
val Df2 = Df1.withColumn("line_id", last("line_id", true) over (partitionWindow))
Df2.show