Spark 无法从 Amazon Kinesis 获取事件

Spark not able to fetch events from Amazon Kinesis

我最近一直在尝试从 Kinesis 获取 Spark 读取事件,但在接收事件时遇到问题。虽然 Spark 能够连接到 Kinesis 并能够从 Kinesis 获取元数据,但它无法从中获取事件。它总是取回零元素。

没有错误,只是返回空结果。 Spark 能够获取元数据(例如,kinesis 中的分片数量等)。

我已经使用了这些 [1 & 2] 指南来让它正常工作,但运气还不太好。我还尝试了 SO [3] 中的一些建议。群集有足够的 resources/cores 可用。

我们发现 Spark 和 Kinesis 之间的 Protobuf 版本存在版本冲突,这也可能是导致此行为的原因。 Spark 使用 protobuf-java 版本 2.5.0,而 kinesis 可能使用 protobuf-java-2.6.1.jar。

只是想知道是否有人遇到过这种行为,或者是否有 spark 与 kinesis 一起工作。

已尝试使用 Spark 1.5.0、Spark 1.6.0。

  1. http://spark.apache.org/docs/latest/streaming-kinesis-integration.html
  2. https://github.com/apache/spark/blob/master/extras/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala

  3. Apache Spark Kinesis Sample not working

回答我自己的问题 -

我在 Spark Kinesis 集成方面取得了一些成功,关键是 unionStreams.foreachRDD。

有 2 个版本的 foreachRDD 可用

  • unionStreams.foreachRDD
  • unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: 时间)

出于某种原因,第一个无法为我提供结果,但更改为第二个可以按预期获取结果。尚待探究原因。

在下面添加代码片段以供参考。

也考虑改变这个。这对我也有帮助-

"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1",  // Works

希望对大家有所帮助:)

感谢大家的帮助。

val kinesisStreams = (0 until numStreams).map {
  count =>
    val stream = KinesisUtils.createStream(
      ssc,
      consumerName,
      streamName,
      endpointUrl,
      regionName,
      InitialPositionInStream.TRIM_HORIZON,
      kinesisCheckpointInterval,
      StorageLevel.MEMORY_AND_DISK_2
    )

    stream
}
val unionStreams = ssc.union(kinesisStreams)

println(s"========================")
println(s"Num of streams: ${numStreams}")
println(s"========================")

/*unionStreams.foreachRDD{ // Doesn't Work !!
  rdd =>
    println(rdd.count)
    println("rdd isempty:" + rdd.isEmpty)
}*/ 
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
  println(rdd.count)
  println("rdd isempty:" + rdd.isEmpty)
  }
)

ssc.start()
ssc.awaitTermination()