有没有办法用最新的 Kafka 消费替换旧消息(避免在最终 df 中重复)

Is there way to replace older messages with latest in Kafka consumption(avoid duplicates in final df)

我正在使用主题中的数据,正如我们所知,我们实时获取数据,在其中我们看到重复的元素,如何实际用最新消息替换旧消息。

我正在使用以下相同的代码从主题中消费

schema = StructType(
    [
        StructField("Id",StringType(),True),
        StructField("cTime",StringType(),True),
        StructField("latestTime",StringType(),False),
        StructField("service",StringType(),True),
    ]

topic = "topic1"
bootstrap_servers = "mrdc.it.com:9093,mrdc.it.com:9093,mrdc.it.com:9093"

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxx.aud.com" password="xxxxxxxx";',\
    "kafka.ssl.ca.location": "/tmp/cert.crt",\
    "kafka.sasl.mechanism": "PLAIN",\
    "kafka.security.protocol" : "SASL_SSL",\
    "kafka.bootstrap.servers": bootstrap_servers,\
    "failOnDataLoss": "false",\
    "subscribe": topic,\
    "startingOffsets": "latest",\
    "enable.auto.commit": "false",\
    "auto.offset.reset": "false",\
    "enable.partition.eof": "true",\
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_df = spark.readStream.format("kafka").options(**options).load()

kafka_mobile_apps_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("apps"))

df = avertack_kafka_eventhub_connections(source= "KAFKA", kafka_config=kafka_config)

sql_features = ["apps.Id",
                "apps.cTime",
                "apps.latesTime", 
                "apps.service"
               ]

kafka_df_features = df.selectExpr(sql_features)
display(kafka_df_features)

输出结果如图

Id              cTime                   latestTime              service
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:58.601Z    mobile
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:59.012Z    mobile
3240    2022-03-03T20:39:59.140Z    2022-03-03T20:39:59.220Z    mobile
3246    2022-03-03T20:40:00.615Z    2022-03-03T20:40:00.648Z    mobile
.
.
.

我们如何用第 2 行覆盖第 1 行,使用键作为 ["id"],其中“latestTime”列,如何只保留最新的时间消息。

有没有实时的方法,如果没有,我们至少一小时检查一次,用新消息替换旧消息

最终输出

Id              cTime                   latestTime              service
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:59.012Z    mobile
3240    2022-03-03T20:39:59.140Z    2022-03-03T20:39:59.220Z    mobile
3246    2022-03-03T20:40:00.615Z    2022-03-03T20:40:00.648Z    mobile
.
.
.
.

Spark 更适合 batch/micro-batch 处理;它获取一组记录,您可以按时间排序并获取“最新”,或者您可以“按 ID 分组”并执行相同的操作...

但是,您必须将其与某些数据库/持久存储相结合,以创建“最近的,按 id”的视图。我已经看到使用 HBase、Couchbase、MongoDB 等完成此操作,如果需要的话,它们都具有一定程度的 Spark 集成。

开箱即用,使用 Spark,我认为它不容易提供此功能(您可以查看 RocksDB State Store)。


另外,Kafka 本身就有 Kafka Streams,它提供了您正在寻找的东西,尽管 Java.

如果您必须使用 Python,可以设置 KSQLdb 并且它的 API 可以在任何其他语言中使用,并且允许您更轻松地将流处理定义为 SQL 语句。


Kafka Connect是另一种选择,比如你有一个标准的关系型数据库,你数据中的ID是一个table的主键,那么匹配键会执行UPDATE查询并覆盖现有记录,或插入新 ID。