StructuredStreaming - 根据 Kafka 主题中的新事件处理数据

StructuredStreaming - processing data based on new events in Kafka topic

我有一个结构化的Streaming程序,从Kafka Topic A读取数据,做一些处理,最后将数据放到目标Kafka Topic中。

注意: 处理是在函数 - convertToDictForEachBatch() 中完成的,它调用使用 - foreachBatch(convertToDictForEachBatch)

作为处理的一部分,它会读取另一个Kafka Topic (events_topic),如果在上次读取之后有一条New record(s), 它会进行一些额外的处理 - 从 BigQuery table 重新加载数据并保存它。

df_stream = spark.readStream.format('kafka') \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password) \
        .option("kafka.bootstrap.servers",kafkaBrokers)\
        .option("subscribe", topic) \
        .option("kafka.group.id", consumerGroupId)\
        .option("startingOffsets", "latest") \
        .option("failOnDataLoss", "false") \
        .option("maxOffsetsPerTrigger", 10000) \
        .load()


    print(" df_stream -> ", df_stream)
    query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
        .outputMode("append") \
        .trigger(processingTime='4 minutes') \
        .option("numRows",10000)\
        .option("truncate", "false") \
        .option("checkpointLocation", checkpoint) \
        .foreachBatch(convertToDictForEachBatch) \
        .start()

    query.awaitTermination()

# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):
      
    # Uses the dataframe to do processing of data, the code is not added, since it is not relevant to this question 

   # Additional processing i.e. reloading of prediction data from Big query, into Data Frame - based on event in Kafka topic   
   # checks for event in topic - topic_reloadpred and further processing takes place if there is new data in the topic
    events = spark.read.format('kafka') \
        .option("kafka.bootstrap.servers", kafkaBrokers) \
        .option("kafka.security.protocol", "SSL") \
        .option("kafka.ssl.truststore.location", ssl_truststore_location) \
        .option("kafka.ssl.truststore.password", ssl_truststore_password) \
        .option("kafka.ssl.keystore.location", ssl_keystore_location_reloadpred) \
        .option("kafka.ssl.keystore.password", ssl_keystore_password_reloadpred) \
        .option("subscribe", topic_reloadpred) \
        .option("kafka.group.id", consumerGroupId_reloadpred) \
        .load()

    # events is passed to a function, and processing is done if new events are generated

实现此目标的最佳方法是什么? 当前代码正在读取kafka主题中的全部数据,我需要它只读取新数据。

正如@Rishabh Sharma 所建议的,我将偏移量存储在具有单个分区的单独的 kafka 主题中(我也可以将其存储在同一主题中的单独分区中)。 在处理过程中,我正在检查最后更新的偏移量与当前添加的偏移量。如果当前偏移量大于上次更新的偏移量,我将进行进一步处理(即从 BigQuery 重新加载 table)