在 zeppelin notebook 中保存 spark streaming 使用的 kafka 消息

saving kafka messages consumed by spark streaming in zeppelin notebook

我在 zeppelin notebook 中保存 spark streaming 使用的 kafka 消息时遇到问题。

我的代码是:

case class Message(id: Long, message: String, timestamp: Long) extends Serializable

   val ssc = new StreamingContext(sc, Seconds(2))

  val messagesStream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, 
    Map("zookeeper.connect" -> "localhost:2181", "group.id" -> "test-consumer-group"),
    Map("test" -> 4),
    StorageLevel.MEMORY_ONLY)
    .map { case (k, v) =>  implicit val formats = DefaultFormats; JsonMethods.parse(v).extract[Message] }
    .filter(_.id % 2 == 0)

  val mes =  messagesStream.window(Seconds(10))

  mes
  .map(m => Message(m.id, m.message, m.timestamp))
  .foreachRDD( rdd => rdd.toDF.registerTempTable("messages"))

  ssc.start() 

当我 运行 %sql select * from messages 时它没有显示数据,但是 table 被定义了。如果我在 Cassandra 上将保存更改为 tempTable,它会正确保存和显示数据。不明白为什么会这样。

感谢您的帮助。

好的,问题来了。让我们首先回顾一下 foreachRDD 运算符定义:

foreachRDD 没有按照预期的方式使用。它是最通用的输出运算符,它将函数 func 应用于从流生成的每个 RDD。该函数应该将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或者通过网络将其写入数据库。请注意,函数 func 在驱动进程 运行 流式应用程序中执行,并且通常会在其中执行 RDD 操作,这将强制计算流式 RDD。

那么您的代码实际发生的情况如下:

由于 DStream 是由输出操作延迟执行的,就像 RDD 是由 RDD 操作延迟执行的一样。具体来说,DStream 输出操作中的 RDD 操作会强制处理接收到的数据。 因此,如果您的应用程序没有任何输出操作(您没有),或者具有像 dstream.foreachRDD() 这样的输出操作但其中没有任何 RDD 操作,则不会执行任何操作。系统将简单地接收数据并将其丢弃

因此,每次执行 registerTempTable 时,您的 RDD 数据都会被丢弃,因此您的 SQL 查询会给出一个空结果。

要解决您的问题,您需要将数据保存在某处(Cassandra 是一个不错的选择),然后对其进行查询。

如果你想避免另一个集群:另一种解决方案是将 rdd 转换为 row,然后转换为 df,然后将其作为 parquet 或 orc 保存到 hdfs,并可选择附加文件 ex:

write.format("orc").mode("overwrite").partitionBy("id").save("/tmp/data")

我只是想知道 AWS 博主如何能够直接对临时 table 重新 [在此处输入 link 描述][1]

执行分析

好在结构化流即将推出 :)

[1]:aws 博客:https://blogs.aws.amazon.com/bigdata/post/Tx3K805CZ8WFBRP/Analyze-Realtime-Data-from-Amazon-Kinesis-Streams-Using-Zeppelin-and-Spark-Stream