Spark 无法从 Amazon Kinesis 获取事件
Spark not able to fetch events from Amazon Kinesis
我最近一直在尝试从 Kinesis 获取 Spark 读取事件,但在接收事件时遇到问题。虽然 Spark 能够连接到 Kinesis 并能够从 Kinesis 获取元数据,但它无法从中获取事件。它总是取回零元素。
没有错误,只是返回空结果。 Spark 能够获取元数据(例如,kinesis 中的分片数量等)。
我已经使用了这些 [1 & 2] 指南来让它正常工作,但运气还不太好。我还尝试了 SO [3] 中的一些建议。群集有足够的 resources/cores 可用。
我们发现 Spark 和 Kinesis 之间的 Protobuf 版本存在版本冲突,这也可能是导致此行为的原因。 Spark 使用 protobuf-java 版本 2.5.0,而 kinesis 可能使用 protobuf-java-2.6.1.jar。
只是想知道是否有人遇到过这种行为,或者是否有 spark 与 kinesis 一起工作。
已尝试使用 Spark 1.5.0、Spark 1.6.0。
回答我自己的问题 -
我在 Spark Kinesis 集成方面取得了一些成功,关键是 unionStreams.foreachRDD。
有 2 个版本的 foreachRDD 可用
- unionStreams.foreachRDD
- unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: 时间)
出于某种原因,第一个无法为我提供结果,但更改为第二个可以按预期获取结果。尚待探究原因。
在下面添加代码片段以供参考。
也考虑改变这个。这对我也有帮助-
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1", // Works
希望对大家有所帮助:)
感谢大家的帮助。
val kinesisStreams = (0 until numStreams).map {
count =>
val stream = KinesisUtils.createStream(
ssc,
consumerName,
streamName,
endpointUrl,
regionName,
InitialPositionInStream.TRIM_HORIZON,
kinesisCheckpointInterval,
StorageLevel.MEMORY_AND_DISK_2
)
stream
}
val unionStreams = ssc.union(kinesisStreams)
println(s"========================")
println(s"Num of streams: ${numStreams}")
println(s"========================")
/*unionStreams.foreachRDD{ // Doesn't Work !!
rdd =>
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}*/
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}
)
ssc.start()
ssc.awaitTermination()
我最近一直在尝试从 Kinesis 获取 Spark 读取事件,但在接收事件时遇到问题。虽然 Spark 能够连接到 Kinesis 并能够从 Kinesis 获取元数据,但它无法从中获取事件。它总是取回零元素。
没有错误,只是返回空结果。 Spark 能够获取元数据(例如,kinesis 中的分片数量等)。
我已经使用了这些 [1 & 2] 指南来让它正常工作,但运气还不太好。我还尝试了 SO [3] 中的一些建议。群集有足够的 resources/cores 可用。
我们发现 Spark 和 Kinesis 之间的 Protobuf 版本存在版本冲突,这也可能是导致此行为的原因。 Spark 使用 protobuf-java 版本 2.5.0,而 kinesis 可能使用 protobuf-java-2.6.1.jar。
只是想知道是否有人遇到过这种行为,或者是否有 spark 与 kinesis 一起工作。
已尝试使用 Spark 1.5.0、Spark 1.6.0。
回答我自己的问题 -
我在 Spark Kinesis 集成方面取得了一些成功,关键是 unionStreams.foreachRDD。
有 2 个版本的 foreachRDD 可用
- unionStreams.foreachRDD
- unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: 时间)
出于某种原因,第一个无法为我提供结果,但更改为第二个可以按预期获取结果。尚待探究原因。
在下面添加代码片段以供参考。
也考虑改变这个。这对我也有帮助-
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.6.0", // Doesnt work
"org.apache.spark" % "spark-streaming-kinesis-asl_2.10" % "1.4.1", // Works
希望对大家有所帮助:)
感谢大家的帮助。
val kinesisStreams = (0 until numStreams).map {
count =>
val stream = KinesisUtils.createStream(
ssc,
consumerName,
streamName,
endpointUrl,
regionName,
InitialPositionInStream.TRIM_HORIZON,
kinesisCheckpointInterval,
StorageLevel.MEMORY_AND_DISK_2
)
stream
}
val unionStreams = ssc.union(kinesisStreams)
println(s"========================")
println(s"Num of streams: ${numStreams}")
println(s"========================")
/*unionStreams.foreachRDD{ // Doesn't Work !!
rdd =>
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}*/
unionStreams.foreachRDD ((rdd: RDD[Array[Byte]], time: Time) => { // Works, Yeah !!
println(rdd.count)
println("rdd isempty:" + rdd.isEmpty)
}
)
ssc.start()
ssc.awaitTermination()