为什么从 Kafka 读取流失败 "Unable to find encoder for type stored in a Dataset"?
Why does reading stream from Kafka fail with "Unable to find encoder for type stored in a Dataset"?
我正在尝试将 Spark Structured Streaming 与 Kafka 结合使用。
object StructuredStreaming {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StructuredStreaming <hostname> <port>")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
// Subscribe to 1 topic
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("subscribe", "sparkss")
.load()
lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
}
}
我从 Spark 文档中获取了我的代码,但遇到了这个构建错误:
Unable to find encoder for type stored in a Dataset. Primitive types
(Int, String, etc) and Product types (case classes) are supported by
importing spark.implicits._ Support for serializing other types will
be added in future releases.
.as[(String, String)]
我在其他 SO post 上读到这是由于缺少 import spark.implicits._
。但这对我来说没有任何改变。
更新 :
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>1.7.12</slf4j.version>
<spark.version>2.1.0</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.10</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
嗯,我试过 scala 2.11.8
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
并具有相应的依赖项(对于 scala 2.11),它最终成功了。
警告 : 你需要在intelliJ上重启你的项目,我觉得换版本不重启的时候有问题,错误依旧。
我正在尝试将 Spark Structured Streaming 与 Kafka 结合使用。
object StructuredStreaming {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: StructuredStreaming <hostname> <port>")
System.exit(1)
}
val host = args(0)
val port = args(1).toInt
val spark = SparkSession
.builder
.appName("StructuredStreaming")
.config("spark.master", "local")
.getOrCreate()
import spark.implicits._
// Subscribe to 1 topic
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9093")
.option("subscribe", "sparkss")
.load()
lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
}
}
我从 Spark 文档中获取了我的代码,但遇到了这个构建错误:
Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. .as[(String, String)]
我在其他 SO post 上读到这是由于缺少 import spark.implicits._
。但这对我来说没有任何改变。
更新 :
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>1.7.12</slf4j.version>
<spark.version>2.1.0</spark.version>
<scala.version>2.10.4</scala.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.10</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
嗯,我试过 scala 2.11.8
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
</dependencies>
并具有相应的依赖项(对于 scala 2.11),它最终成功了。
警告 : 你需要在intelliJ上重启你的项目,我觉得换版本不重启的时候有问题,错误依旧。