Dataset foreach中的SparkSession空点异常

SparkSession null point exception in Dataset foreach

我是 Spark 新手。

我想继续从 kafka 获取消息,然后在消息大小超过 100000 时保存到 S3。

我用 Dataset.collectAsList() 实现了它,但是它用 Total size of serialized results of 3 tasks (1389.3 MiB) is bigger than spark.driver.maxResultSize

抛出了错误

于是转而使用foreach,使用SparkSession创建DataFrame时提示null point exception

有什么想法吗?谢谢

---代码---

SparkSession spark = generateSparkSession();
        registerUdf4AddPartition(spark);
        Dataset<Row> dataset = spark.readStream().format("kafka")
                .option("kafka.bootstrap.servers", args[0])
                .option("kafka.group.id", args[1])
                .option("subscribe", args[2])
                .option("kafka.security.protocol", SecurityProtocol.SASL_PLAINTEXT.name)
                .load();
        DataStreamWriter<Row> console = dataset.toDF().writeStream().foreachBatch((rawDataset, time) -> {
            Dataset<Row> rowDataset = rawDataset.selectExpr("CAST(value AS STRING)");
            //using foreach
            rowDataset.foreach(row -> {
                List<Span> rawDataList = new CsvToBeanBuilder(new StringReader(row.getString(0))).withType(Span.class).build().parse();
                spans.addAll(rawDataList);
                batchSave(spark);
            });

            // using collectAsList
            List<Row> rows = rowDataset.collectAsList();
            for (Row row : rows) {
                List<Span> rawDataList = new CsvToBeanBuilder(new StringReader(row.getString(0))).withType(Span.class).build().parse();
                spans.addAll(rawDataList);
                batchSave(spark);
            }
        });
        StreamingQuery start = console.start();
        start.awaitTermination();

public static void batchSave(SparkSession spark){
        synchronized (spans){
            if(spans.size() == 100000){
                System.out.println(spans.isEmpty());
                Dataset<Row> spanDataSet = spark.createDataFrame(spans, Span.class);
                Dataset<Row> finalResult = addCustomizedTimeByUdf(spanDataSet);

                StringBuilder pathBuilder = new StringBuilder("s3a://fwk-dataplatform-np/datalake/log/FWK/ART2/test/leftAndRight");
                finalResult.repartition(1).write().partitionBy("year","month","day","hour").format("csv").mode("append").save(pathBuilder.toString());
                spans.clear();
            }
        }
    }

因为主要的SparkSession是运行在driver,而foreach...的任务是运行分布在executors,因此 spark 未定义给所有其他执行程序。

顺便说一句,在 foreach 任务中使用 synchronized 没有任何意义,因为所有内容都是分布式的。