创建 Spark Streaming 上下文后在 Spark 中缓存 RDS 数据
Cache RDMS data in spark after creating sparkstreaming context
我们使用 Spark Streaming 通过 createDirectStream 从 Kafka 获取数据。
在同一个程序中,我连接到 MYSQL 以从数据库中获取一些数据。现在我想使用 spark 缓存这个结果。
这里的问题是我在开始时创建了一个 spark streaming 上下文,现在要缓存这个 MYSQL 数据,我必须将它转换为一个 RDD,这只有在 spark 上下文的帮助下才能实现,不幸的是,我无法创建火花上下文(因为已经根据火花流创建了上下文)。
我不想设置 spark.driver.allowMultipleContexts = true
以允许 JVM 使用超过 spark 上下文,因为这可能会导致问题。
有没有一种方法可以使用 spark 缓存它,或者我们如何将 MYSQL 的结果转换为 RDD?
这里根据你的问题描述。假设您正在启动一个 StreamingContext,如下所示:
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
您始终可以从您的流上下文中获取 SparkContext,如下所示:
val sc = ssc.sparkContext
然后做你想做的事。它是您的流上下文的关联 Spark 上下文,因此无需为此创建新的 Spark 上下文。
我们使用 Spark Streaming 通过 createDirectStream 从 Kafka 获取数据。
在同一个程序中,我连接到 MYSQL 以从数据库中获取一些数据。现在我想使用 spark 缓存这个结果。
这里的问题是我在开始时创建了一个 spark streaming 上下文,现在要缓存这个 MYSQL 数据,我必须将它转换为一个 RDD,这只有在 spark 上下文的帮助下才能实现,不幸的是,我无法创建火花上下文(因为已经根据火花流创建了上下文)。
我不想设置 spark.driver.allowMultipleContexts = true
以允许 JVM 使用超过 spark 上下文,因为这可能会导致问题。
有没有一种方法可以使用 spark 缓存它,或者我们如何将 MYSQL 的结果转换为 RDD?
这里根据你的问题描述。假设您正在启动一个 StreamingContext,如下所示:
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
您始终可以从您的流上下文中获取 SparkContext,如下所示:
val sc = ssc.sparkContext
然后做你想做的事。它是您的流上下文的关联 Spark 上下文,因此无需为此创建新的 Spark 上下文。