使用带水印的追加输出模式时的结构化流异常
Structured Streaming exception when using append output mode with watermark
尽管我正在使用 withWatermark()
,但当我 运行 我的 spark 作业时收到以下错误消息:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
从我在 programming guide 中看到的情况来看,这完全符合预期用途(和示例代码)。有谁知道可能出了什么问题?
提前致谢!
相关代码(Java8,Spark 2.2.0):
StructType logSchema = new StructType()
.add("timestamp", TimestampType)
.add("key", IntegerType)
.add("val", IntegerType);
Dataset<Row> kafka = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
Dataset<Row> parsed = kafka
.select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
.select("parsed_value.*");
Dataset<Row> tenSecondCounts = parsed
.withWatermark("timestamp", "10 minutes")
.groupBy(
parsed.col("key"),
window(parsed.col("timestamp"), "1 day"))
.count();
StreamingQuery query = tenSecondCounts
.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.option("truncate", false)
.start();
问题出在parsed.col
。将其替换为 col
将解决此问题。我建议总是使用 col
函数而不是 Dataset.col
.
Dataset.col
returns resolved column
而 col
returns unresolved column
.
parsed.withWatermark("timestamp", "10 minutes")
将创建一个包含同名新列的新数据集。水印信息附加在新数据集中的 timestamp
列,而不是 parsed.col("timestamp")
,因此 groupBy
中的列没有水印。
当您使用未解析的列时,Spark 会为您找出正确的列。
尽管我正在使用 withWatermark()
,但当我 运行 我的 spark 作业时收到以下错误消息:
Exception in thread "main" org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
从我在 programming guide 中看到的情况来看,这完全符合预期用途(和示例代码)。有谁知道可能出了什么问题?
提前致谢!
相关代码(Java8,Spark 2.2.0):
StructType logSchema = new StructType()
.add("timestamp", TimestampType)
.add("key", IntegerType)
.add("val", IntegerType);
Dataset<Row> kafka = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.load();
Dataset<Row> parsed = kafka
.select(from_json(col("value").cast("string"), logSchema).alias("parsed_value"))
.select("parsed_value.*");
Dataset<Row> tenSecondCounts = parsed
.withWatermark("timestamp", "10 minutes")
.groupBy(
parsed.col("key"),
window(parsed.col("timestamp"), "1 day"))
.count();
StreamingQuery query = tenSecondCounts
.writeStream()
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode("append")
.format("console")
.option("truncate", false)
.start();
问题出在parsed.col
。将其替换为 col
将解决此问题。我建议总是使用 col
函数而不是 Dataset.col
.
Dataset.col
returns resolved column
而 col
returns unresolved column
.
parsed.withWatermark("timestamp", "10 minutes")
将创建一个包含同名新列的新数据集。水印信息附加在新数据集中的 timestamp
列,而不是 parsed.col("timestamp")
,因此 groupBy
中的列没有水印。
当您使用未解析的列时,Spark 会为您找出正确的列。