kafka 仅将接收器连接到 mongo 最后一个延迟结果

kafka connect sink to mongo only last result with delay

我有聚合查询 pageViewcountry 分组,结果推送到主题。

并通过kafka connector

下沉到mongodb
{
    "connector.class": "MongoDbAtlasSink",
    "name": "confluent-mongodb-sink",
    "input.data.format" : "JSON",
    "connection.host": "ip",
    "topics": "viewPageCountByUsers",
    "max.num.retries": "3",
    "retries.defer.timeout": "5000",
    "max.batch.size": "0",
    "database": "test",
    "collection": "ViewPagesCountByUsers",
    "tasks.max": "1"
}

问题是这个数据非常频繁而且负载很大mongodb。我如何将 kafkaconnection 设置为只按键发送最后一个值作为批处理,例如延迟 5 秒? 例子:更新数据库5次没有意义

{countryID:7, viewCount: 111}
{countryID:7, viewCount: 112}
{countryID:7, viewCount: 113}
{countryID:7, viewCount: 114}
{countryID:7, viewCount: 115}

如果有机会只发送最后一个结果,延迟 5 秒,我可以更新 1 次。

// collect batch 5 sec and flush: 
{countryID:7, viewCount: 115}
{countryID:8, viewCount: 573}

怎么做到的?

接收器连接器只接受主题中的任何内容,通常不进行批处理。

你需要使用 stream-processor 比如 Kafka Streams / KSQLdb 到 运行 windowed-aggregation,然后输出到一个新的主题,你会从接收器连接器。