使用带水印的追加输出模式时的结构化流异常

Structured Streaming exception when using append output mode with watermark

尽管我正在使用 withWatermark(),但当我 运行 我的 spark 作业时收到以下错误消息:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;

从我在 programming guide 中看到的情况来看,这完全符合预期用途(和示例代码)。有谁知道可能出了什么问题?

提前致谢!

相关代码(Java8,Spark 2.2.0):

StructType logSchema = new StructType()
        .add("timestamp", TimestampType)
        .add("key", IntegerType)
        .add("val", IntegerType);

Dataset<Row> kafka = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", brokers)
        .option("subscribe", topics)
        .load();

Dataset<Row> parsed = kafka
        .select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
        .select("parsed_value.*");

Dataset<Row> tenSecondCounts = parsed
        .withWatermark("timestamp", "10 minutes")
        .groupBy(
            parsed.col("key"),
            window(parsed.col("timestamp"), "1 day"))
        .count();

StreamingQuery query = tenSecondCounts
        .writeStream()
        .trigger(Trigger.ProcessingTime("10 seconds"))
        .outputMode("append")
        .format("console")
        .option("truncate", false)
        .start();

问题出在parsed.col。将其替换为 col 将解决此问题。我建议总是使用 col 函数而不是 Dataset.col.

Dataset.col returns resolved columncol returns unresolved column.

parsed.withWatermark("timestamp", "10 minutes") 将创建一个包含同名新列的新数据集。水印信息附加在新数据集中的 timestamp 列,而不是 parsed.col("timestamp"),因此 groupBy 中的列没有水印。

当您使用未解析的列时,Spark 会为您找出正确的列。