在 Spark SQL 中实现 Structured Streaming + Kafka 时,我无法将参数传递给 foreach 循环

I can't pass parameters to foreach loop while implementing Structured Streaming + Kafka in Spark SQL

我按照 Structured Streaming + Kafka 的说明构建了一个程序,接收从 kafka 发送的数据流作为输入,当我收到数据流时我想传递它到 SparkSession 变量来使用 Spark SQL 做一些查询工作,所以我再次扩展 ForeachWriter class 如下:

package stream;

import java.io.FileNotFoundException;
import java.io.PrintWriter;

import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.SparkSession;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;

import dataservices.OrderDataServices;
import models.SuccessEvent;

public class MapEventWriter extends ForeachWriter<String>{
private SparkSession spark;

public MapEventWriter(SparkSession spark) {
    this.spark = spark;
}

private static final long serialVersionUID = 1L;

@Override
public void close(Throwable errorOrNull) {
    // TODO Auto-generated method stub
    
}

@Override
public boolean open(long partitionId, long epochId) {
    // TODO Auto-generated method stub
    return true;
}

@Override
public void process(String input) {     
    OrderDataServices services = new OrderDataServices(this.spark);
}
}

但是在process函数中,如果我使用spark变量,程序报错,程序传入我的spark如下:

package demo;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;

import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.json.simple.parser.ParseException;

import dataservices.OrderDataServices;
import models.MapperEvent;
import models.OrderEvent;
import models.SuccessEvent;
import stream.MapEventWriter;
import stream.MapEventWriter1;

public class Demo {
    public static void main(String[] args) throws TimeoutException, StreamingQueryException, ParseException, IOException {
        try (SparkSession spark = SparkSession.builder().appName("Read kafka").getOrCreate()) {
            Dataset<String> data = spark
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers", "localhost:9092")
                    .option("subscribe", "tiki-1")
                    .load()
                    .selectExpr("CAST(value AS STRING)")
                    .as(Encoders.STRING());
            
            MapEventWriter eventWriter = new MapEventWriter(spark);
            
            StreamingQuery query = data
                    .writeStream()
                    .foreach(eventWriter)
                    .start();
            
            query.awaitTermination();
            
        }
    }
    
    
}

错误是NullPointerExceptionspark调用位置,即没有初始化spark变量。 希望有人能帮助我,我真的很感激。

Caused by: java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:151)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:149)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:998)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:655)
    at dataservices.OrderDataServices.<init>(OrderDataServices.java:18)
    at stream.MapEventWriter.process(MapEventWriter.java:38)
    at stream.MapEventWriter.process(MapEventWriter.java:15)

do some query work with Spark SQL

你不会为此使用 ForEachWriter

.selectExpr("CAST(value AS STRING)")
          .as(Encoders.STRING());  // or parse your JSON here using a schema 

data.select(...)  // or move this to a method / class that takes the Dataset as a parameter

// await termination