如何在 spark streaming 中解析 Json 格式的 Kafka 消息
How to parse Json formatted Kafka message in spark streaming
我在 Kafka 上有 JSON 条这样的消息:
{"id_post":"p1", "message":"blablabla"}
我想解析消息,并打印(或用于进一步计算)message
元素。
使用以下代码打印 json
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, inputGroup, topicMap)
val postStream = kafkaStream.map(_._2)
postStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0){
rdd.foreach(record => {
println(record)
}
}
但我无法获取单个元素。
我尝试了一些 JSON 解析器,但没有成功。
有什么想法吗?
update:
a few errors with different JSON parser
this is the code and output with circe parser:
val parsed_record = parse(record)
和输出:
14:45:00,676 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at io.circe.jawn.CirceSupportParser$$anon$$anon.add(CirceSupportParser.scala:36)
at jawn.CharBasedParser$class.parseString(CharBasedParser.scala:90)
at jawn.StringParser.parseString(StringParser.scala:15)
at jawn.Parser.rparse(Parser.scala:397)
at jawn.Parser.parse(Parser.scala:338)
at jawn.SyncParser.parse(SyncParser.scala:24)
at jawn.SupportParser$$anonfun$parseFromString.apply(SupportParser.scala:15)
等等..在我使用parse(record)
的那一行
看起来它无法访问 and/or 解析字符串 record
.
如果我使用 lift-json 也一样
在 parse(record)
处,错误输出大致相同:
16:58:20,425 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at net.liftweb.json.JsonParser$$anonfun.apply(JsonParser.scala:144)
at net.liftweb.json.JsonParser$$anonfun.apply(JsonParser.scala:141)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:80)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:45)
at net.liftweb.json.package$.parse(package.scala:40)
at SparkConsumer$$anonfun$main$$anonfun$apply.apply(SparkConsumer.scala:98)
at SparkConsumer$$anonfun$main$$anonfun$apply.apply(SparkConsumer.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
我解决了这个问题,所以我写在这里以备将来参考:
依赖,依赖,依赖!
我选择使用 lift-json,但这适用于任何 JSON 解析器 and/or 框架。
我使用的 SPARK 版本 (v1.4.1) 是与 scala 2.10 兼容的版本,这里是 pom.xml:
的依赖项
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.1</version>
<scope>provided</scope>
</dependency>
和其他一些图书馆。我在 scala 2.11 中使用了 lift-json 版本……那是 WRONG.
所以,对于未来的我,如果你正在阅读这个主题:与 scala 版本和依赖项保持一致。
在 lift-json 情况下:
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json_2.10</artifactId>
<version>3.0-M1</version>
</dependency>
和你一样的问题。
但是我使用 fastjson
解决了这个问题。
SBT dependency :
// http://mvnrepository.com/artifact/com.alibaba/fastjson
libraryDependencies += "com.alibaba" % "fastjson" % "1.2.12"
或者
Maven dependency :
<!-- http://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.12</version>
</dependency>
你可以试试看。希望这会有所帮助。
从 Scala/Apache Spark
中的 JSON 字符串中提取数据
import org.apache.spark.rdd.RDD
object JsonData extends serializable{
def main(args: Array[String]): Unit = {
val msg = "{ \"id_post\":\"21\",\"message\":\"blablabla\"}";
val m1 = msgParse(msg)
println(m1.id_post)
}
case class SomeClass(id_post: String, message: String) extends serializable
def msgParse(msg: String): SomeClass = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val m = parse(msg).extract[SomeClass]
return m
}
}
以下是 Maven Decency
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.10</artifactId>
<version>3.3.0</version>
</dependency>
我在 Kafka 上有 JSON 条这样的消息:
{"id_post":"p1", "message":"blablabla"}
我想解析消息,并打印(或用于进一步计算)message
元素。
使用以下代码打印 json
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, inputGroup, topicMap)
val postStream = kafkaStream.map(_._2)
postStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0){
rdd.foreach(record => {
println(record)
}
}
但我无法获取单个元素。 我尝试了一些 JSON 解析器,但没有成功。 有什么想法吗?
update: a few errors with different JSON parser this is the code and output with circe parser:
val parsed_record = parse(record)
和输出:
14:45:00,676 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at io.circe.jawn.CirceSupportParser$$anon$$anon.add(CirceSupportParser.scala:36)
at jawn.CharBasedParser$class.parseString(CharBasedParser.scala:90)
at jawn.StringParser.parseString(StringParser.scala:15)
at jawn.Parser.rparse(Parser.scala:397)
at jawn.Parser.parse(Parser.scala:338)
at jawn.SyncParser.parse(SyncParser.scala:24)
at jawn.SupportParser$$anonfun$parseFromString.apply(SupportParser.scala:15)
等等..在我使用parse(record)
的那一行
看起来它无法访问 and/or 解析字符串 record
.
如果我使用 lift-json 也一样
在 parse(record)
处,错误输出大致相同:
16:58:20,425 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at net.liftweb.json.JsonParser$$anonfun.apply(JsonParser.scala:144)
at net.liftweb.json.JsonParser$$anonfun.apply(JsonParser.scala:141)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:80)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:45)
at net.liftweb.json.package$.parse(package.scala:40)
at SparkConsumer$$anonfun$main$$anonfun$apply.apply(SparkConsumer.scala:98)
at SparkConsumer$$anonfun$main$$anonfun$apply.apply(SparkConsumer.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
在 scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
我解决了这个问题,所以我写在这里以备将来参考:
依赖,依赖,依赖!
我选择使用 lift-json,但这适用于任何 JSON 解析器 and/or 框架。
我使用的 SPARK 版本 (v1.4.1) 是与 scala 2.10 兼容的版本,这里是 pom.xml:
的依赖项<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.1</version>
<scope>provided</scope>
</dependency>
和其他一些图书馆。我在 scala 2.11 中使用了 lift-json 版本……那是 WRONG.
所以,对于未来的我,如果你正在阅读这个主题:与 scala 版本和依赖项保持一致。 在 lift-json 情况下:
<dependency>
<groupId>net.liftweb</groupId>
<artifactId>lift-json_2.10</artifactId>
<version>3.0-M1</version>
</dependency>
和你一样的问题。
但是我使用 fastjson
解决了这个问题。
SBT dependency :
// http://mvnrepository.com/artifact/com.alibaba/fastjson
libraryDependencies += "com.alibaba" % "fastjson" % "1.2.12"
或者
Maven dependency :
<!-- http://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.12</version>
</dependency>
你可以试试看。希望这会有所帮助。
从 Scala/Apache Spark
中的 JSON 字符串中提取数据import org.apache.spark.rdd.RDD
object JsonData extends serializable{
def main(args: Array[String]): Unit = {
val msg = "{ \"id_post\":\"21\",\"message\":\"blablabla\"}";
val m1 = msgParse(msg)
println(m1.id_post)
}
case class SomeClass(id_post: String, message: String) extends serializable
def msgParse(msg: String): SomeClass = {
import org.json4s._
import org.json4s.native.JsonMethods._
implicit val formats = DefaultFormats
val m = parse(msg).extract[SomeClass]
return m
}
}
以下是 Maven Decency
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-native_2.10</artifactId>
<version>3.3.0</version>
</dependency>