有没有办法用最新的 Kafka 消费替换旧消息(避免在最终 df 中重复)
Is there way to replace older messages with latest in Kafka consumption(avoid duplicates in final df)
我正在使用主题中的数据,正如我们所知,我们实时获取数据,在其中我们看到重复的元素,如何实际用最新消息替换旧消息。
我正在使用以下相同的代码从主题中消费
schema = StructType(
[
StructField("Id",StringType(),True),
StructField("cTime",StringType(),True),
StructField("latestTime",StringType(),False),
StructField("service",StringType(),True),
]
topic = "topic1"
bootstrap_servers = "mrdc.it.com:9093,mrdc.it.com:9093,mrdc.it.com:9093"
options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxx.aud.com" password="xxxxxxxx";',\
"kafka.ssl.ca.location": "/tmp/cert.crt",\
"kafka.sasl.mechanism": "PLAIN",\
"kafka.security.protocol" : "SASL_SSL",\
"kafka.bootstrap.servers": bootstrap_servers,\
"failOnDataLoss": "false",\
"subscribe": topic,\
"startingOffsets": "latest",\
"enable.auto.commit": "false",\
"auto.offset.reset": "false",\
"enable.partition.eof": "true",\
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_df = spark.readStream.format("kafka").options(**options).load()
kafka_mobile_apps_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("apps"))
df = avertack_kafka_eventhub_connections(source= "KAFKA", kafka_config=kafka_config)
sql_features = ["apps.Id",
"apps.cTime",
"apps.latesTime",
"apps.service"
]
kafka_df_features = df.selectExpr(sql_features)
display(kafka_df_features)
输出结果如图
Id cTime latestTime service
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:58.601Z mobile
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:59.012Z mobile
3240 2022-03-03T20:39:59.140Z 2022-03-03T20:39:59.220Z mobile
3246 2022-03-03T20:40:00.615Z 2022-03-03T20:40:00.648Z mobile
.
.
.
我们如何用第 2 行覆盖第 1 行,使用键作为 ["id"],其中“latestTime”列,如何只保留最新的时间消息。
有没有实时的方法,如果没有,我们至少一小时检查一次,用新消息替换旧消息
最终输出
Id cTime latestTime service
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:59.012Z mobile
3240 2022-03-03T20:39:59.140Z 2022-03-03T20:39:59.220Z mobile
3246 2022-03-03T20:40:00.615Z 2022-03-03T20:40:00.648Z mobile
.
.
.
.
Spark 更适合 batch/micro-batch 处理;它获取一组记录,您可以按时间排序并获取“最新”,或者您可以“按 ID 分组”并执行相同的操作...
但是,您必须将其与某些数据库/持久存储相结合,以创建“最近的,按 id”的视图。我已经看到使用 HBase、Couchbase、MongoDB 等完成此操作,如果需要的话,它们都具有一定程度的 Spark 集成。
开箱即用,使用 Spark,我认为它不容易提供此功能(您可以查看 RocksDB State Store)。
另外,Kafka 本身就有 Kafka Streams,它提供了您正在寻找的东西,尽管 Java.
如果您必须使用 Python,可以设置 KSQLdb 并且它的 API 可以在任何其他语言中使用,并且允许您更轻松地将流处理定义为 SQL 语句。
Kafka Connect是另一种选择,比如你有一个标准的关系型数据库,你数据中的ID是一个table的主键,那么匹配键会执行UPDATE
查询并覆盖现有记录,或插入新 ID。
我正在使用主题中的数据,正如我们所知,我们实时获取数据,在其中我们看到重复的元素,如何实际用最新消息替换旧消息。
我正在使用以下相同的代码从主题中消费
schema = StructType(
[
StructField("Id",StringType(),True),
StructField("cTime",StringType(),True),
StructField("latestTime",StringType(),False),
StructField("service",StringType(),True),
]
topic = "topic1"
bootstrap_servers = "mrdc.it.com:9093,mrdc.it.com:9093,mrdc.it.com:9093"
options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxx.aud.com" password="xxxxxxxx";',\
"kafka.ssl.ca.location": "/tmp/cert.crt",\
"kafka.sasl.mechanism": "PLAIN",\
"kafka.security.protocol" : "SASL_SSL",\
"kafka.bootstrap.servers": bootstrap_servers,\
"failOnDataLoss": "false",\
"subscribe": topic,\
"startingOffsets": "latest",\
"enable.auto.commit": "false",\
"auto.offset.reset": "false",\
"enable.partition.eof": "true",\
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_df = spark.readStream.format("kafka").options(**options).load()
kafka_mobile_apps_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("apps"))
df = avertack_kafka_eventhub_connections(source= "KAFKA", kafka_config=kafka_config)
sql_features = ["apps.Id",
"apps.cTime",
"apps.latesTime",
"apps.service"
]
kafka_df_features = df.selectExpr(sql_features)
display(kafka_df_features)
输出结果如图
Id cTime latestTime service
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:58.601Z mobile
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:59.012Z mobile
3240 2022-03-03T20:39:59.140Z 2022-03-03T20:39:59.220Z mobile
3246 2022-03-03T20:40:00.615Z 2022-03-03T20:40:00.648Z mobile
.
.
.
我们如何用第 2 行覆盖第 1 行,使用键作为 ["id"],其中“latestTime”列,如何只保留最新的时间消息。
有没有实时的方法,如果没有,我们至少一小时检查一次,用新消息替换旧消息
最终输出
Id cTime latestTime service
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:59.012Z mobile
3240 2022-03-03T20:39:59.140Z 2022-03-03T20:39:59.220Z mobile
3246 2022-03-03T20:40:00.615Z 2022-03-03T20:40:00.648Z mobile
.
.
.
.
Spark 更适合 batch/micro-batch 处理;它获取一组记录,您可以按时间排序并获取“最新”,或者您可以“按 ID 分组”并执行相同的操作...
但是,您必须将其与某些数据库/持久存储相结合,以创建“最近的,按 id”的视图。我已经看到使用 HBase、Couchbase、MongoDB 等完成此操作,如果需要的话,它们都具有一定程度的 Spark 集成。
开箱即用,使用 Spark,我认为它不容易提供此功能(您可以查看 RocksDB State Store)。
另外,Kafka 本身就有 Kafka Streams,它提供了您正在寻找的东西,尽管 Java.
如果您必须使用 Python,可以设置 KSQLdb 并且它的 API 可以在任何其他语言中使用,并且允许您更轻松地将流处理定义为 SQL 语句。
Kafka Connect是另一种选择,比如你有一个标准的关系型数据库,你数据中的ID是一个table的主键,那么匹配键会执行UPDATE
查询并覆盖现有记录,或插入新 ID。