StructuredStreaming - 根据 Kafka 主题中的新事件处理数据
StructuredStreaming - processing data based on new events in Kafka topic
我有一个结构化的Streaming程序,从Kafka Topic A读取数据,做一些处理,最后将数据放到目标Kafka Topic中。
注意:
处理是在函数 - convertToDictForEachBatch() 中完成的,它调用使用 - foreachBatch(convertToDictForEachBatch)
作为处理的一部分,它会读取另一个Kafka Topic (events_topic),如果在上次读取之后有一条New record(s),
它会进行一些额外的处理 - 从 BigQuery table 重新加载数据并保存它。
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
print(" df_stream -> ", df_stream)
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='4 minutes') \
.option("numRows",10000)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
query.awaitTermination()
# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):
# Uses the dataframe to do processing of data, the code is not added, since it is not relevant to this question
# Additional processing i.e. reloading of prediction data from Big query, into Data Frame - based on event in Kafka topic
# checks for event in topic - topic_reloadpred and further processing takes place if there is new data in the topic
events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password", ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()
# events is passed to a function, and processing is done if new events are generated
实现此目标的最佳方法是什么?
当前代码正在读取kafka主题中的全部数据,我需要它只读取新数据。
正如@Rishabh Sharma 所建议的,我将偏移量存储在具有单个分区的单独的 kafka 主题中(我也可以将其存储在同一主题中的单独分区中)。
在处理过程中,我正在检查最后更新的偏移量与当前添加的偏移量。如果当前偏移量大于上次更新的偏移量,我将进行进一步处理(即从 BigQuery 重新加载 table)
我有一个结构化的Streaming程序,从Kafka Topic A读取数据,做一些处理,最后将数据放到目标Kafka Topic中。
注意: 处理是在函数 - convertToDictForEachBatch() 中完成的,它调用使用 - foreachBatch(convertToDictForEachBatch)
作为处理的一部分,它会读取另一个Kafka Topic (events_topic),如果在上次读取之后有一条New record(s), 它会进行一些额外的处理 - 从 BigQuery table 重新加载数据并保存它。
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
print(" df_stream -> ", df_stream)
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='4 minutes') \
.option("numRows",10000)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
query.awaitTermination()
# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):
# Uses the dataframe to do processing of data, the code is not added, since it is not relevant to this question
# Additional processing i.e. reloading of prediction data from Big query, into Data Frame - based on event in Kafka topic
# checks for event in topic - topic_reloadpred and further processing takes place if there is new data in the topic
events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password", ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()
# events is passed to a function, and processing is done if new events are generated
实现此目标的最佳方法是什么? 当前代码正在读取kafka主题中的全部数据,我需要它只读取新数据。
正如@Rishabh Sharma 所建议的,我将偏移量存储在具有单个分区的单独的 kafka 主题中(我也可以将其存储在同一主题中的单独分区中)。 在处理过程中,我正在检查最后更新的偏移量与当前添加的偏移量。如果当前偏移量大于上次更新的偏移量,我将进行进一步处理(即从 BigQuery 重新加载 table)