Dataset foreach中的SparkSession空点异常
SparkSession null point exception in Dataset foreach
我是 Spark 新手。
我想继续从 kafka 获取消息,然后在消息大小超过 100000 时保存到 S3。
我用 Dataset.collectAsList() 实现了它,但是它用 Total size of serialized results of 3 tasks (1389.3 MiB) is bigger than spark.driver.maxResultSize
抛出了错误
于是转而使用foreach,使用SparkSession创建DataFrame时提示null point exception
有什么想法吗?谢谢
---代码---
SparkSession spark = generateSparkSession();
registerUdf4AddPartition(spark);
Dataset<Row> dataset = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", args[0])
.option("kafka.group.id", args[1])
.option("subscribe", args[2])
.option("kafka.security.protocol", SecurityProtocol.SASL_PLAINTEXT.name)
.load();
DataStreamWriter<Row> console = dataset.toDF().writeStream().foreachBatch((rawDataset, time) -> {
Dataset<Row> rowDataset = rawDataset.selectExpr("CAST(value AS STRING)");
//using foreach
rowDataset.foreach(row -> {
List<Span> rawDataList = new CsvToBeanBuilder(new StringReader(row.getString(0))).withType(Span.class).build().parse();
spans.addAll(rawDataList);
batchSave(spark);
});
// using collectAsList
List<Row> rows = rowDataset.collectAsList();
for (Row row : rows) {
List<Span> rawDataList = new CsvToBeanBuilder(new StringReader(row.getString(0))).withType(Span.class).build().parse();
spans.addAll(rawDataList);
batchSave(spark);
}
});
StreamingQuery start = console.start();
start.awaitTermination();
public static void batchSave(SparkSession spark){
synchronized (spans){
if(spans.size() == 100000){
System.out.println(spans.isEmpty());
Dataset<Row> spanDataSet = spark.createDataFrame(spans, Span.class);
Dataset<Row> finalResult = addCustomizedTimeByUdf(spanDataSet);
StringBuilder pathBuilder = new StringBuilder("s3a://fwk-dataplatform-np/datalake/log/FWK/ART2/test/leftAndRight");
finalResult.repartition(1).write().partitionBy("year","month","day","hour").format("csv").mode("append").save(pathBuilder.toString());
spans.clear();
}
}
}
因为主要的SparkSession
是运行在driver,而foreach...
的任务是运行分布在executors,因此 spark
未定义给所有其他执行程序。
顺便说一句,在 foreach
任务中使用 synchronized
没有任何意义,因为所有内容都是分布式的。
我是 Spark 新手。
我想继续从 kafka 获取消息,然后在消息大小超过 100000 时保存到 S3。
我用 Dataset.collectAsList() 实现了它,但是它用 Total size of serialized results of 3 tasks (1389.3 MiB) is bigger than spark.driver.maxResultSize
于是转而使用foreach,使用SparkSession创建DataFrame时提示null point exception
有什么想法吗?谢谢
---代码---
SparkSession spark = generateSparkSession();
registerUdf4AddPartition(spark);
Dataset<Row> dataset = spark.readStream().format("kafka")
.option("kafka.bootstrap.servers", args[0])
.option("kafka.group.id", args[1])
.option("subscribe", args[2])
.option("kafka.security.protocol", SecurityProtocol.SASL_PLAINTEXT.name)
.load();
DataStreamWriter<Row> console = dataset.toDF().writeStream().foreachBatch((rawDataset, time) -> {
Dataset<Row> rowDataset = rawDataset.selectExpr("CAST(value AS STRING)");
//using foreach
rowDataset.foreach(row -> {
List<Span> rawDataList = new CsvToBeanBuilder(new StringReader(row.getString(0))).withType(Span.class).build().parse();
spans.addAll(rawDataList);
batchSave(spark);
});
// using collectAsList
List<Row> rows = rowDataset.collectAsList();
for (Row row : rows) {
List<Span> rawDataList = new CsvToBeanBuilder(new StringReader(row.getString(0))).withType(Span.class).build().parse();
spans.addAll(rawDataList);
batchSave(spark);
}
});
StreamingQuery start = console.start();
start.awaitTermination();
public static void batchSave(SparkSession spark){
synchronized (spans){
if(spans.size() == 100000){
System.out.println(spans.isEmpty());
Dataset<Row> spanDataSet = spark.createDataFrame(spans, Span.class);
Dataset<Row> finalResult = addCustomizedTimeByUdf(spanDataSet);
StringBuilder pathBuilder = new StringBuilder("s3a://fwk-dataplatform-np/datalake/log/FWK/ART2/test/leftAndRight");
finalResult.repartition(1).write().partitionBy("year","month","day","hour").format("csv").mode("append").save(pathBuilder.toString());
spans.clear();
}
}
}
因为主要的SparkSession
是运行在driver,而foreach...
的任务是运行分布在executors,因此 spark
未定义给所有其他执行程序。
顺便说一句,在 foreach
任务中使用 synchronized
没有任何意义,因为所有内容都是分布式的。