使用 Redis 进行 Spark Structured Streaming 动态查找
Spark Structured Streaming dynamic lookup with Redis
我是 spark 的新手。
我们目前正在建设管道:
- 从 Kafka 主题读取事件
- 借助 Redis-Lookup 丰富此数据
- 将事件写入新的 Kafka 主题
所以,我的问题是,当我想使用 spark-redis 库时,它的性能非常好,但数据在我的流式作业中保持静态。
尽管 Redis 刷新了数据,但它并没有反映到我的数据帧中。
Spark 首先读取数据然后从不更新它。
另外,我首先从 REDIS 数据中读取,总数据约为 1mio key-val 字符串。
我能做什么approaches/methods,我想使用 Redis 作为内存动态查找。
并且查找 table 几乎改变了 1 小时。
谢谢。
使用的库:
火花-redis-2.4.1.jar
commons-pool2-2.0.jar
jedis-3.2.0.jar
代码部分如下:
import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events")
val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
.alias("C"))
import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*")
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )
val curated_df = my_raw_events_df
.select(
...
$"C.SystemEntryDate".alias("RecordCreateDate")
,$"C.Profile".alias("ProfileCode")
,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
,$"C.IdentityType"
,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")
...
).as("C")
import org.apache.spark.sql.streaming.Trigger
val query = curated_df
.select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("topic", "curated-event")
.option("checkpointLocation","/user/spark/checkPointLocation/xyz")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
query.awaitTermination()
一种选择是不使用 spark-redis,而是直接在 Redis 中查找。这可以通过 df.mapPartitions
函数来实现。您可以在此处 https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/ 找到 Spark DStreams 的一些示例。 Structural Streaming 的想法是相似的。注意正确处理Redis连接。
另一种解决方案是进行流静态连接 (spark docs):
不是将 redis rdd 收集到驱动程序,而是使用 redis 数据帧 (spark-redis docs) 作为静态数据帧与您的流连接,所以它会像:
val redisStaticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(redisStaticDf, ...)
由于 spark 微批处理引擎评估每个触发器上的查询执行,redis 数据帧将在每个触发器上获取数据,为您提供最新数据(如果您将缓存它赢得的数据帧't)
我是 spark 的新手。 我们目前正在建设管道:
- 从 Kafka 主题读取事件
- 借助 Redis-Lookup 丰富此数据
- 将事件写入新的 Kafka 主题
所以,我的问题是,当我想使用 spark-redis 库时,它的性能非常好,但数据在我的流式作业中保持静态。
尽管 Redis 刷新了数据,但它并没有反映到我的数据帧中。 Spark 首先读取数据然后从不更新它。 另外,我首先从 REDIS 数据中读取,总数据约为 1mio key-val 字符串。
我能做什么approaches/methods,我想使用 Redis 作为内存动态查找。 并且查找 table 几乎改变了 1 小时。
谢谢。
使用的库: 火花-redis-2.4.1.jar commons-pool2-2.0.jar jedis-3.2.0.jar
代码部分如下:
import com.intertech.hortonworks.spark.registry.functions._
val config = Map[String, Object]("schema.registry.url" -> "http://aa.bbb.ccc.yyy:xxxx/api/v1")
implicit val srConfig:SchemaRegistryConfig = SchemaRegistryConfig(config)
var rawEventSchema = sparkSchema("my_raw_json_events")
val my_raw_events_df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("subscribe", "my-raw-event")
.option("failOnDataLoss","false")
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger",1000)
.load()
.select(from_json($"value".cast("string"),rawEventSchema, Map.empty[String, String])
.alias("C"))
import com.redislabs.provider.redis._
val sc = spark.sparkContext
val stringRdd = sc.fromRedisKV("PARAMETERS:*")
val lookup_map = stringRdd.collect().toMap
val lookup = udf((key: String) => lookup_map.getOrElse(key,"") )
val curated_df = my_raw_events_df
.select(
...
$"C.SystemEntryDate".alias("RecordCreateDate")
,$"C.Profile".alias("ProfileCode")
,**lookup(expr("'PARAMETERS:PROFILE||'||NVL(C.Profile,'')")).alias("ProfileName")**
,$"C.IdentityType"
,lookup(expr("'PARAMETERS:IdentityType||'||NVL(C.IdentityType,'')")).alias("IdentityTypeName")
...
).as("C")
import org.apache.spark.sql.streaming.Trigger
val query = curated_df
.select(to_sr(struct($"*"), "curated_event_sch").alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "aa.bbb.ccc.yyy:9092")
.option("topic", "curated-event")
.option("checkpointLocation","/user/spark/checkPointLocation/xyz")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
query.awaitTermination()
一种选择是不使用 spark-redis,而是直接在 Redis 中查找。这可以通过 df.mapPartitions
函数来实现。您可以在此处 https://blog.codecentric.de/en/2017/07/lookup-additional-data-in-spark-streaming/ 找到 Spark DStreams 的一些示例。 Structural Streaming 的想法是相似的。注意正确处理Redis连接。
另一种解决方案是进行流静态连接 (spark docs):
不是将 redis rdd 收集到驱动程序,而是使用 redis 数据帧 (spark-redis docs) 作为静态数据帧与您的流连接,所以它会像:
val redisStaticDf = spark.read. ...
val streamingDf = spark.readStream. ...
streamingDf.join(redisStaticDf, ...)
由于 spark 微批处理引擎评估每个触发器上的查询执行,redis 数据帧将在每个触发器上获取数据,为您提供最新数据(如果您将缓存它赢得的数据帧't)