使用 selectExpr 时从 spark 2.1.1 中的 kafka 读取异常

Exception in reading from kafka in spark 2.1.1 when using selectExpr

我是 运行 spark 提供的默认示例,用于计算来自 Kafka 流的单词。

这是我的代码 运行:

import scala.Tuple2;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
import java.util.Map;
import java.util.HashMap;


import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

public class JavaWordCount {
    private static final Pattern SPACE = Pattern.compile(" ");

    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession
                .builder()
                .appName("JavaWordCount")
                .getOrCreate();
        Dataset<Row> lines = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "TutorialTopic")
                .option("startingOffsets", "latest")
                .load();
        lines.selectExpr("CAST key AS STRING", "CAST value AS STRING");
        Dataset<String> words = lines
                .as(Encoders.STRING())
                .flatMap(
                        new FlatMapFunction<String, String>() {
                            @Override
                            public Iterator<String> call(String x) {
                                return Arrays.asList(x.split(" ")).iterator();
                            }
                        }, Encoders.STRING());
        Dataset<Row> wordCounts = words.groupBy("value").count();
        StreamingQuery query = wordCounts.writeStream()
                .outputMode("complete")
                .format("console")
                .start();

        query.awaitTermination();
    }
}

在我的 pom.xml 文件中,我添加了以下依赖项:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

<dependency>
   <groupId>org.apache.spark</groupId>
   <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
   <version>2.1.1</version>
</dependency>

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming_2.11</artifactId>
  <version>2.1.1</version>
</dependency>

我使用以下命令将代码提交到 spark:

./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.2 --class "JavaWordCount" --master local[4] target/spark-sql-kafka-0-10_2.11-1.0.jar

当 运行 时出现以下异常:

lines.selectExpr("CAST key AS STRING","CAST value AS STRING");

异常:

Try to map struct<key:binary,value:binary,topic:string,partition:int,offset:bigint,timestamp:timestamp,timestampType:int> to Tuple1, but failed as the number of fields does not line up. 

请帮我解决这个异常。 谢谢!

问题出在这一行 lines.as(Encoders.STRING())

你可以改变

    lines.selectExpr("CAST key AS STRING", "CAST value AS STRING");
    Dataset<String> words = lines
            .as(Encoders.STRING())

    Dataset<String> words = lines.selectExpr("CAST value AS STRING")
            .as(Encoders.STRING())

您需要使用 lines.selectExpr 的 return 值。此方法不会更改 lines 本身。并且由于您使用的是 .as(Encoders.STRING()),我认为您只需要 value.