使用 foreach 附加到文本文件的 Spark Structured Streaming
Spark Structured Streaming for appending to text file using foreach
我想使用结构化流向文本文件追加行。此代码生成 SparkException: Task not serializable
。我认为 toDF
是不允许的。我怎样才能让这段代码起作用?
df.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(row: Row): Unit = {
val df = Seq(row.getString(0)).toDF
df.write.format("text").mode("append").save(output)
}
override def close(errorOrNull: Throwable): Unit = {
}
}).start
您不能在 process
方法中调用 df.write.format("text").mode("append").save(output)
。它会在执行者端 运行 。您可以改用文件接收器,例如
df.writeStream.format("text")....
我想使用结构化流向文本文件追加行。此代码生成 SparkException: Task not serializable
。我认为 toDF
是不允许的。我怎样才能让这段代码起作用?
df.writeStream
.foreach(new ForeachWriter[Row] {
override def open(partitionId: Long, version: Long): Boolean = {
true
}
override def process(row: Row): Unit = {
val df = Seq(row.getString(0)).toDF
df.write.format("text").mode("append").save(output)
}
override def close(errorOrNull: Throwable): Unit = {
}
}).start
您不能在 process
方法中调用 df.write.format("text").mode("append").save(output)
。它会在执行者端 运行 。您可以改用文件接收器,例如
df.writeStream.format("text")....