在 zeppelin notebook 中保存 spark streaming 使用的 kafka 消息
saving kafka messages consumed by spark streaming in zeppelin notebook
我在 zeppelin notebook 中保存 spark streaming 使用的 kafka 消息时遇到问题。
我的代码是:
case class Message(id: Long, message: String, timestamp: Long) extends Serializable
val ssc = new StreamingContext(sc, Seconds(2))
val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
Map("test" -> 4),
StorageLevel.MEMORY_ONLY)
.map { case (k, v) => implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
.filter(_.id % 2 == 0)
val mes = messagesStream.window(Seconds(10))
mes
.map(m => Message(m.id, m.message, m.timestamp))
.foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))
ssc.start()
当我 运行 %sql select * from messages
时它没有显示数据,但是 table 被定义了。如果我在 Cassandra 上将保存更改为 tempTable,它会正确保存和显示数据。不明白为什么会这样。
感谢您的帮助。
好的,问题来了。让我们首先回顾一下 foreachRDD 运算符定义:
foreachRDD
没有按照预期的方式使用。它是最通用的输出运算符,它将函数 func 应用于从流生成的每个 RDD。该函数应该将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或者通过网络将其写入数据库。请注意,函数 func 在驱动进程 运行 流式应用程序中执行,并且通常会在其中执行 RDD 操作,这将强制计算流式 RDD。
那么您的代码实际发生的情况如下:
由于 DStream 是由输出操作延迟执行的,就像 RDD 是由 RDD 操作延迟执行的一样。具体来说,DStream 输出操作中的 RDD 操作会强制处理接收到的数据。 因此,如果您的应用程序没有任何输出操作(您没有),或者具有像 dstream.foreachRDD() 这样的输出操作但其中没有任何 RDD 操作,则不会执行任何操作。系统将简单地接收数据并将其丢弃。
因此,每次执行 registerTempTable
时,您的 RDD 数据都会被丢弃,因此您的 SQL 查询会给出一个空结果。
要解决您的问题,您需要将数据保存在某处(Cassandra 是一个不错的选择),然后对其进行查询。
如果你想避免另一个集群:另一种解决方案是将 rdd 转换为 row,然后转换为 df,然后将其作为 parquet 或 orc 保存到 hdfs,并可选择附加文件 ex:
write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")
我只是想知道 AWS 博主如何能够直接对临时 table 重新 [在此处输入 link 描述][1]
执行分析
好在结构化流即将推出 :)
我在 zeppelin notebook 中保存 spark streaming 使用的 kafka 消息时遇到问题。
我的代码是:
case class Message(id: Long, message: String, timestamp: Long) extends Serializable
val ssc = new StreamingContext(sc, Seconds(2))
val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc,
Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
Map("test" -> 4),
StorageLevel.MEMORY_ONLY)
.map { case (k, v) => implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
.filter(_.id % 2 == 0)
val mes = messagesStream.window(Seconds(10))
mes
.map(m => Message(m.id, m.message, m.timestamp))
.foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))
ssc.start()
当我 运行 %sql select * from messages
时它没有显示数据,但是 table 被定义了。如果我在 Cassandra 上将保存更改为 tempTable,它会正确保存和显示数据。不明白为什么会这样。
感谢您的帮助。
好的,问题来了。让我们首先回顾一下 foreachRDD 运算符定义:
foreachRDD
没有按照预期的方式使用。它是最通用的输出运算符,它将函数 func 应用于从流生成的每个 RDD。该函数应该将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或者通过网络将其写入数据库。请注意,函数 func 在驱动进程 运行 流式应用程序中执行,并且通常会在其中执行 RDD 操作,这将强制计算流式 RDD。
那么您的代码实际发生的情况如下:
由于 DStream 是由输出操作延迟执行的,就像 RDD 是由 RDD 操作延迟执行的一样。具体来说,DStream 输出操作中的 RDD 操作会强制处理接收到的数据。 因此,如果您的应用程序没有任何输出操作(您没有),或者具有像 dstream.foreachRDD() 这样的输出操作但其中没有任何 RDD 操作,则不会执行任何操作。系统将简单地接收数据并将其丢弃。
因此,每次执行 registerTempTable
时,您的 RDD 数据都会被丢弃,因此您的 SQL 查询会给出一个空结果。
要解决您的问题,您需要将数据保存在某处(Cassandra 是一个不错的选择),然后对其进行查询。
如果你想避免另一个集群:另一种解决方案是将 rdd 转换为 row,然后转换为 df,然后将其作为 parquet 或 orc 保存到 hdfs,并可选择附加文件 ex:
write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")
我只是想知道 AWS 博主如何能够直接对临时 table 重新 [在此处输入 link 描述][1]
执行分析好在结构化流即将推出 :)