在 Spark SQL 中实现 Structured Streaming + Kafka 时,我无法将参数传递给 foreach 循环
I can't pass parameters to foreach loop while implementing Structured Streaming + Kafka in Spark SQL
我按照 Structured Streaming + Kafka 的说明构建了一个程序,接收从 kafka 发送的数据流作为输入,当我收到数据流时我想传递它到 SparkSession 变量来使用 Spark SQL 做一些查询工作,所以我再次扩展 ForeachWriter class 如下:
package stream;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.SparkSession;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import dataservices.OrderDataServices;
import models.SuccessEvent;
public class MapEventWriter extends ForeachWriter<String>{
private SparkSession spark;
public MapEventWriter(SparkSession spark) {
this.spark = spark;
}
private static final long serialVersionUID = 1L;
@Override
public void close(Throwable errorOrNull) {
// TODO Auto-generated method stub
}
@Override
public boolean open(long partitionId, long epochId) {
// TODO Auto-generated method stub
return true;
}
@Override
public void process(String input) {
OrderDataServices services = new OrderDataServices(this.spark);
}
}
但是在process函数中,如果我使用spark变量,程序报错,程序传入我的spark如下:
package demo;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.json.simple.parser.ParseException;
import dataservices.OrderDataServices;
import models.MapperEvent;
import models.OrderEvent;
import models.SuccessEvent;
import stream.MapEventWriter;
import stream.MapEventWriter1;
public class Demo {
public static void main(String[] args) throws TimeoutException, StreamingQueryException, ParseException, IOException {
try (SparkSession spark = SparkSession.builder().appName("Read kafka").getOrCreate()) {
Dataset<String> data = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "tiki-1")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
MapEventWriter eventWriter = new MapEventWriter(spark);
StreamingQuery query = data
.writeStream()
.foreach(eventWriter)
.start();
query.awaitTermination();
}
}
}
错误是NullPointerException在spark调用位置,即没有初始化spark变量。
希望有人能帮助我,我真的很感激。
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:151)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:149)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:998)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:655)
at dataservices.OrderDataServices.<init>(OrderDataServices.java:18)
at stream.MapEventWriter.process(MapEventWriter.java:38)
at stream.MapEventWriter.process(MapEventWriter.java:15)
do some query work with Spark SQL
你不会为此使用 ForEachWriter
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING()); // or parse your JSON here using a schema
data.select(...) // or move this to a method / class that takes the Dataset as a parameter
// await termination
我按照 Structured Streaming + Kafka 的说明构建了一个程序,接收从 kafka 发送的数据流作为输入,当我收到数据流时我想传递它到 SparkSession 变量来使用 Spark SQL 做一些查询工作,所以我再次扩展 ForeachWriter class 如下:
package stream;
import java.io.FileNotFoundException;
import java.io.PrintWriter;
import org.apache.spark.sql.ForeachWriter;
import org.apache.spark.sql.SparkSession;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import dataservices.OrderDataServices;
import models.SuccessEvent;
public class MapEventWriter extends ForeachWriter<String>{
private SparkSession spark;
public MapEventWriter(SparkSession spark) {
this.spark = spark;
}
private static final long serialVersionUID = 1L;
@Override
public void close(Throwable errorOrNull) {
// TODO Auto-generated method stub
}
@Override
public boolean open(long partitionId, long epochId) {
// TODO Auto-generated method stub
return true;
}
@Override
public void process(String input) {
OrderDataServices services = new OrderDataServices(this.spark);
}
}
但是在process函数中,如果我使用spark变量,程序报错,程序传入我的spark如下:
package demo;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.json.simple.parser.ParseException;
import dataservices.OrderDataServices;
import models.MapperEvent;
import models.OrderEvent;
import models.SuccessEvent;
import stream.MapEventWriter;
import stream.MapEventWriter1;
public class Demo {
public static void main(String[] args) throws TimeoutException, StreamingQueryException, ParseException, IOException {
try (SparkSession spark = SparkSession.builder().appName("Read kafka").getOrCreate()) {
Dataset<String> data = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "tiki-1")
.load()
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
MapEventWriter eventWriter = new MapEventWriter(spark);
StreamingQuery query = data
.writeStream()
.foreach(eventWriter)
.start();
query.awaitTermination();
}
}
}
错误是NullPointerException在spark调用位置,即没有初始化spark变量。 希望有人能帮助我,我真的很感激。
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:151)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:149)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:998)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:655)
at dataservices.OrderDataServices.<init>(OrderDataServices.java:18)
at stream.MapEventWriter.process(MapEventWriter.java:38)
at stream.MapEventWriter.process(MapEventWriter.java:15)
do some query work with Spark SQL
你不会为此使用 ForEachWriter
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING()); // or parse your JSON here using a schema
data.select(...) // or move this to a method / class that takes the Dataset as a parameter
// await termination