火花流持续 table 更新

spark streaming persistent table updates

我有一个 spark 结构化流应用程序(听 kafka),它也在从 s3 中的持久 table 读取我正在尝试让每个微批次检查 table 的更新。我试过了

var myTable = spark.table("myTable!")

spark.sql("select * from parquet.`s3n://myFolder/`")

两者都不能在流式上下文中工作。问题是 parquet 文件在每次更新时都会发生变化,而 spark 不会 运行 任何正常的刷新命令,例如:

spark.catalog.refreshTable("myTable!")
spark.sqlContext.clearCache()

我也试过:

spark.sqlContext.setConf("spark.sql.parquet.cacheMetadata","false")
  spark.conf.set("spark.sql.parquet.cacheMetadata",false)

无济于事。必须有办法做到这一点。改为使用 jdbc 连接到数据库会更聪明吗?

假设我没看错,我认为问题是因为 DataFrame 是 immutable,除非您重新启动流式查询并创建一个新数据框。这个问题有come up on the Spark Mailing List before. The definitive answer appears to be that the only way to capture these updates is to restart the streaming query. If your application cannot tolerate 10 second hiccups you might want to check out this blog post which summarizes the above conversation and discusses how SnappyData enables mutations on Spark DataFrames.

免责声明:我为 SnappyData 工作

这将完成我正在寻找的东西。

val df1Schema = spark.read.option("header", "true").csv("test1.csv").schema
    val df1 = spark.readStream.schema(df1Schema).option("header", "true").csv("/1")
    df1.writeStream.format("memory").outputMode("append").queryName("df1").start()

    var df1 = sql("select * from df1")

缺点是它的附加。解决一个问题是根据 ID 和最新日期删除重复项。

val dfOrder = df1.orderBy(col("id"), col("updateTableTimestamp").desc)

val dfMax = dfOrder.groupBy(col("id")).agg(first("name").as("name"),first("updateTableTimestamp").as("updateTableTimestamp"))