Spark Streaming Kinesis 消费者 return 空数据
Spark Streaming Kinesis consumer return empty data
我正在尝试使用 Spark 流媒体库来使用 Kinesis Stream,org.apache.spark.streaming.kinesis.KinesisUtils
。我可以使用 python 脚本验证 Stream 中是否有数据。但是,在尝试用 Scala 编写消费者时,我得到的数据是空的。这是我的代码:
def getKinesisData = {
val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
val streamName = "myAwesomeStream"
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
require(credentials != null, "No AWS credentials found.")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val numStreams = numShards
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val sparkConfig = new SparkConf().setAppName("myAwesomeApp").setMaster("local")
val ssc = new StreamingContext(sparkConfig, batchInterval)
val kinesisStreams = (0 until numStreams).map { i =>
println(i)
KinesisUtils.createStream(ssc, "myAwesomeApp", streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2
)
}
val unionStreams = ssc.union(kinesisStreams)
// Convert each line of Array[Byte] to String, and split into words
val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
}
我从 github 那里得到了这个代码作为示例,我并不真正关心所有的联合,以及在代码后面部分完成的平面映射和字数统计。我只需要知道如何从流中获取实际数据。
更新:
当我 运行 它
它在控制台上打印以下内容
16/12/16 14:57:01 INFO SparkContext: Running Spark version 2.0.0
16/12/16 14:57:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/16 14:57:02 INFO SecurityManager: Changing view acls to:
16/12/16 14:57:02 INFO SecurityManager: Changing modify acls to:
16/12/16 14:57:02 INFO SecurityManager: Changing view acls groups to:
16/12/16 14:57:02 INFO SecurityManager: Changing modify acls groups to:
16/12/16 14:57:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(username); groups with view permissions: Set(); users with modify permissions: Set(username); groups with modify permissions: Set()
16/12/16 14:57:02 INFO Utils: Successfully started service 'sparkDriver' on port 54774.
16/12/16 14:57:02 INFO SparkEnv: Registering MapOutputTracker
16/12/16 14:57:02 INFO SparkEnv: Registering BlockManagerMaster
16/12/16 14:57:02 INFO DiskBlockManager: Created local directory at
16/12/16 14:57:02 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
16/12/16 14:57:02 INFO SparkEnv: Registering OutputCommitCoordinator
16/12/16 14:57:02 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/12/16 14:57:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://<I masked this IP address and port>
16/12/16 14:57:03 INFO Executor: Starting executor ID driver on host localhost
16/12/16 14:57:03 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54775.
16/12/16 14:57:03 INFO NettyBlockTransferService: Server created on <I masked this IP address and port>
16/12/16 14:57:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 INFO BlockManagerMasterEndpoint: Registering block manager <I masked this IP address and port> with 2004.6 MB RAM, BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
0 <-- printing shard
1 <-- printing shard
#### PRINTING kinesisStreams ######
Vector(org.apache.spark.streaming.kinesis.KinesisInputDStream@2650f79, org.apache.spark.streaming.kinesis.KinesisInputDStream@75fc1992)
#### PRINTING unionStreams ######
()
#### words######
org.apache.spark.streaming.dstream.FlatMappedDStream@6fd12c5
#### PRINTING wordCounts######
org.apache.spark.streaming.dstream.ShuffledDStream@790a251b
16/12/16 14:57:03 INFO SparkContext: Invoking stop() from shutdown hook
16/12/16 14:57:03 INFO SparkUI: Stopped Spark web UI at http://<I masked this IP address and port>
16/12/16 14:57:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/16 14:57:03 INFO MemoryStore: MemoryStore cleared
16/12/16 14:57:03 INFO BlockManager: BlockManager stopped
16/12/16 14:57:03 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/16 14:57:03 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/16 14:57:03 INFO SparkContext: Successfully stopped SparkContext
16/12/16 14:57:03 INFO ShutdownHookManager: Shutdown hook called
16/12/16 14:57:03 INFO ShutdownHookManager: Deleting directory
问题出在 1.5.2 版本的 Spark Library 上,它不能很好地与 Kinesis 一起工作。
希望这可以帮助遇到此问题的人。
如果你遇到这个问题,那不可能是真正的错误。
Kinesis Kafka Integration 使用 Receiver API 并且它在与 Driver 或 Executors 不同的线程中运行。有一个初始滞后期,您认为一切都已启动,但 Kinesis Receiver 在实际从 Kinesis 下载数据之前仍然 运行 一些程序。
解决方案:等待,在我的例子中,数据在 40-50 秒后出现在 Spark 端
我正在尝试使用 Spark 流媒体库来使用 Kinesis Stream,org.apache.spark.streaming.kinesis.KinesisUtils
。我可以使用 python 脚本验证 Stream 中是否有数据。但是,在尝试用 Scala 编写消费者时,我得到的数据是空的。这是我的代码:
def getKinesisData = {
val endpointUrl = "https://kinesis.us-west-2.amazonaws.com"
val streamName = "myAwesomeStream"
val credentials = new DefaultAWSCredentialsProviderChain().getCredentials()
require(credentials != null, "No AWS credentials found.")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val numStreams = numShards
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val sparkConfig = new SparkConf().setAppName("myAwesomeApp").setMaster("local")
val ssc = new StreamingContext(sparkConfig, batchInterval)
val kinesisStreams = (0 until numStreams).map { i =>
println(i)
KinesisUtils.createStream(ssc, "myAwesomeApp", streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2
)
}
val unionStreams = ssc.union(kinesisStreams)
// Convert each line of Array[Byte] to String, and split into words
val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
}
我从 github 那里得到了这个代码作为示例,我并不真正关心所有的联合,以及在代码后面部分完成的平面映射和字数统计。我只需要知道如何从流中获取实际数据。
更新: 当我 运行 它
它在控制台上打印以下内容16/12/16 14:57:01 INFO SparkContext: Running Spark version 2.0.0
16/12/16 14:57:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/12/16 14:57:02 INFO SecurityManager: Changing view acls to:
16/12/16 14:57:02 INFO SecurityManager: Changing modify acls to:
16/12/16 14:57:02 INFO SecurityManager: Changing view acls groups to:
16/12/16 14:57:02 INFO SecurityManager: Changing modify acls groups to:
16/12/16 14:57:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(username); groups with view permissions: Set(); users with modify permissions: Set(username); groups with modify permissions: Set()
16/12/16 14:57:02 INFO Utils: Successfully started service 'sparkDriver' on port 54774.
16/12/16 14:57:02 INFO SparkEnv: Registering MapOutputTracker
16/12/16 14:57:02 INFO SparkEnv: Registering BlockManagerMaster
16/12/16 14:57:02 INFO DiskBlockManager: Created local directory at
16/12/16 14:57:02 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
16/12/16 14:57:02 INFO SparkEnv: Registering OutputCommitCoordinator
16/12/16 14:57:02 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/12/16 14:57:02 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://<I masked this IP address and port>
16/12/16 14:57:03 INFO Executor: Starting executor ID driver on host localhost
16/12/16 14:57:03 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54775.
16/12/16 14:57:03 INFO NettyBlockTransferService: Server created on <I masked this IP address and port>
16/12/16 14:57:03 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 INFO BlockManagerMasterEndpoint: Registering block manager <I masked this IP address and port> with 2004.6 MB RAM, BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, <I masked this IP address and port>)
16/12/16 14:57:03 WARN StreamingContext: spark.master should be set as local[n], n > 1 in local mode if you have receivers to get data, otherwise Spark jobs will not get resources to process the received data.
0 <-- printing shard
1 <-- printing shard
#### PRINTING kinesisStreams ######
Vector(org.apache.spark.streaming.kinesis.KinesisInputDStream@2650f79, org.apache.spark.streaming.kinesis.KinesisInputDStream@75fc1992)
#### PRINTING unionStreams ######
()
#### words######
org.apache.spark.streaming.dstream.FlatMappedDStream@6fd12c5
#### PRINTING wordCounts######
org.apache.spark.streaming.dstream.ShuffledDStream@790a251b
16/12/16 14:57:03 INFO SparkContext: Invoking stop() from shutdown hook
16/12/16 14:57:03 INFO SparkUI: Stopped Spark web UI at http://<I masked this IP address and port>
16/12/16 14:57:03 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/12/16 14:57:03 INFO MemoryStore: MemoryStore cleared
16/12/16 14:57:03 INFO BlockManager: BlockManager stopped
16/12/16 14:57:03 INFO BlockManagerMaster: BlockManagerMaster stopped
16/12/16 14:57:03 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/12/16 14:57:03 INFO SparkContext: Successfully stopped SparkContext
16/12/16 14:57:03 INFO ShutdownHookManager: Shutdown hook called
16/12/16 14:57:03 INFO ShutdownHookManager: Deleting directory
问题出在 1.5.2 版本的 Spark Library 上,它不能很好地与 Kinesis 一起工作。
希望这可以帮助遇到此问题的人。 如果你遇到这个问题,那不可能是真正的错误。
Kinesis Kafka Integration 使用 Receiver API 并且它在与 Driver 或 Executors 不同的线程中运行。有一个初始滞后期,您认为一切都已启动,但 Kinesis Receiver 在实际从 Kinesis 下载数据之前仍然 运行 一些程序。
解决方案:等待,在我的例子中,数据在 40-50 秒后出现在 Spark 端