如何动态定义流式数据集的模式以写入 csv?
How to define schema of streaming dataset dynamically to write to csv?
我有一个流数据集,从 kafka 读取并尝试写入 CSV
case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
.readStream
.format("kafka")
.load()
.select("value")
.as[Array[Byte]]
.map(decodeEvent)
Event
在内部保存 Map[String,String]
并写入 CSV 我需要一些模式。
假设所有字段都是 String
类型,所以我尝试了 spark repo
中的示例
val columns = List("year","month","date","topic","field1","field2")
val schema = new StructType() //Prepare schema programmatically
columns.foreach { field => schema.add(field, "string") }
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
columns.map(c => event.getOrElse(c, "")
)}
val df = spark.sqlContext.createDataFrame(rowRdd, schema)
这在行 "eventDataset.rdd":
上给出了运行时错误
Caused by: org.apache.spark.sql.AnalysisException: Queries with
streaming sources must be executed with writeStream.start();;
下面不起作用,因为“.map”有一个 List[String] 而不是 Tuple
eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)
有没有办法通过编程模式和结构化流数据集来实现这一目标?
我会使用更简单的方法:
import org.apache.spark.sql.functions._
eventDataset.select(columns.map(
c => coalesce($"map".getItem(c), lit("")).alias(c)
): _*).writeStream.format("csv").start(path)
但如果你想要更接近当前解决方案的东西,请跳过 RDD 转换
import org.apache.spark.sql.catalyst.encoders.RowEncoder
eventDataset.rdd.map(event =>
Row.fromSeq(columns.map(c => event.getOrElse(c,"")))
)(RowEncoder(schema)).writeStream.format("csv").start(path)
我有一个流数据集,从 kafka 读取并尝试写入 CSV
case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
.readStream
.format("kafka")
.load()
.select("value")
.as[Array[Byte]]
.map(decodeEvent)
Event
在内部保存 Map[String,String]
并写入 CSV 我需要一些模式。
假设所有字段都是 String
类型,所以我尝试了 spark repo
val columns = List("year","month","date","topic","field1","field2")
val schema = new StructType() //Prepare schema programmatically
columns.foreach { field => schema.add(field, "string") }
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
columns.map(c => event.getOrElse(c, "")
)}
val df = spark.sqlContext.createDataFrame(rowRdd, schema)
这在行 "eventDataset.rdd":
上给出了运行时错误Caused by: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
下面不起作用,因为“.map”有一个 List[String] 而不是 Tuple
eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)
有没有办法通过编程模式和结构化流数据集来实现这一目标?
我会使用更简单的方法:
import org.apache.spark.sql.functions._
eventDataset.select(columns.map(
c => coalesce($"map".getItem(c), lit("")).alias(c)
): _*).writeStream.format("csv").start(path)
但如果你想要更接近当前解决方案的东西,请跳过 RDD 转换
import org.apache.spark.sql.catalyst.encoders.RowEncoder
eventDataset.rdd.map(event =>
Row.fromSeq(columns.map(c => event.getOrElse(c,"")))
)(RowEncoder(schema)).writeStream.format("csv").start(path)