大java.lang.ClassNotFoundException:play.api.libs.json.Reads
Flink java.lang.ClassNotFoundException: play.api.libs.json.Reads
我正在编写一个代码,该代码从文本文件中读取并将每一行解析为 Json,但是 Flink 中的作业(1.0.3 - 具有 2 个任务管理器的集群)因异常而失败: java.lang.ClassNotFoundException: play.api.libs.json.Reads
我的代码:
import org.apache.flink.streaming.api.scala._
import play.api.libs.json.Json
import org.joda.time.{DateTime, DateTimeZone}
object rabbitjob {
case class MyJson(pr: Long,
dv: Long,
ty: Int,
cr: String,
rc: String,
vl: Boolean,
ss: String,
id: Long
)
def main (args:Array[String]){
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")
implicit val myJsonReads = Json.reads[MyJson]
def jsontr(cuerpo: String): Unit ={
val inputJson = Json.parse(cuerpo)
val myJsonInstance: MyJson = inputJson.as[MyJson]
println(DateTime.now(DateTimeZone.UTC).getMillis() + " " + myJsonInstance.cr + " " + matchtype(myJsonInstance.ty) + " " + " " + matchvalue(myJsonInstance.vl))
def matchtype (x: Int): String = x match{
case 1 => "Door"
case 2 => "Window"
case _ => "otros"
}
def matchvalue (x: Boolean): String = x match{
case true => "ON"
case false => "OFF"
}
}
println(stream)
stream.map(jsontr(_))
env.execute("Test Rabbit")
}
}
有什么想法吗??提前谢谢你
您需要将所有外部依赖与您的 Flink 程序一起打包成一个 "fat" JAR 文件提交给集群。
Flink 文档展示了如何使用 Maven。
我正在编写一个代码,该代码从文本文件中读取并将每一行解析为 Json,但是 Flink 中的作业(1.0.3 - 具有 2 个任务管理器的集群)因异常而失败: java.lang.ClassNotFoundException: play.api.libs.json.Reads
我的代码:
import org.apache.flink.streaming.api.scala._
import play.api.libs.json.Json
import org.joda.time.{DateTime, DateTimeZone}
object rabbitjob {
case class MyJson(pr: Long,
dv: Long,
ty: Int,
cr: String,
rc: String,
vl: Boolean,
ss: String,
id: Long
)
def main (args:Array[String]){
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.readTextFile("hdfs:///test/ignicion.io")
implicit val myJsonReads = Json.reads[MyJson]
def jsontr(cuerpo: String): Unit ={
val inputJson = Json.parse(cuerpo)
val myJsonInstance: MyJson = inputJson.as[MyJson]
println(DateTime.now(DateTimeZone.UTC).getMillis() + " " + myJsonInstance.cr + " " + matchtype(myJsonInstance.ty) + " " + " " + matchvalue(myJsonInstance.vl))
def matchtype (x: Int): String = x match{
case 1 => "Door"
case 2 => "Window"
case _ => "otros"
}
def matchvalue (x: Boolean): String = x match{
case true => "ON"
case false => "OFF"
}
}
println(stream)
stream.map(jsontr(_))
env.execute("Test Rabbit")
}
}
有什么想法吗??提前谢谢你
您需要将所有外部依赖与您的 Flink 程序一起打包成一个 "fat" JAR 文件提交给集群。
Flink 文档展示了如何使用 Maven。