kafka 仅将接收器连接到 mongo 最后一个延迟结果
kafka connect sink to mongo only last result with delay
我有聚合查询 pageView
按 country
分组,结果推送到主题。
并通过kafka connector
下沉到mongodb
{
"connector.class": "MongoDbAtlasSink",
"name": "confluent-mongodb-sink",
"input.data.format" : "JSON",
"connection.host": "ip",
"topics": "viewPageCountByUsers",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"max.batch.size": "0",
"database": "test",
"collection": "ViewPagesCountByUsers",
"tasks.max": "1"
}
问题是这个数据非常频繁而且负载很大mongodb。我如何将 kafkaconnection 设置为只按键发送最后一个值作为批处理,例如延迟 5 秒?
例子:更新数据库5次没有意义
{countryID:7, viewCount: 111}
{countryID:7, viewCount: 112}
{countryID:7, viewCount: 113}
{countryID:7, viewCount: 114}
{countryID:7, viewCount: 115}
如果有机会只发送最后一个结果,延迟 5 秒,我可以更新 1 次。
// collect batch 5 sec and flush:
{countryID:7, viewCount: 115}
{countryID:8, viewCount: 573}
怎么做到的?
接收器连接器只接受主题中的任何内容,通常不进行批处理。
你需要使用 stream-processor 比如 Kafka Streams / KSQLdb 到 运行 windowed-aggregation,然后输出到一个新的主题,你会从接收器连接器。
我有聚合查询 pageView
按 country
分组,结果推送到主题。
并通过kafka connector
下沉到mongodb{
"connector.class": "MongoDbAtlasSink",
"name": "confluent-mongodb-sink",
"input.data.format" : "JSON",
"connection.host": "ip",
"topics": "viewPageCountByUsers",
"max.num.retries": "3",
"retries.defer.timeout": "5000",
"max.batch.size": "0",
"database": "test",
"collection": "ViewPagesCountByUsers",
"tasks.max": "1"
}
问题是这个数据非常频繁而且负载很大mongodb。我如何将 kafkaconnection 设置为只按键发送最后一个值作为批处理,例如延迟 5 秒? 例子:更新数据库5次没有意义
{countryID:7, viewCount: 111}
{countryID:7, viewCount: 112}
{countryID:7, viewCount: 113}
{countryID:7, viewCount: 114}
{countryID:7, viewCount: 115}
如果有机会只发送最后一个结果,延迟 5 秒,我可以更新 1 次。
// collect batch 5 sec and flush:
{countryID:7, viewCount: 115}
{countryID:8, viewCount: 573}
怎么做到的?
接收器连接器只接受主题中的任何内容,通常不进行批处理。
你需要使用 stream-processor 比如 Kafka Streams / KSQLdb 到 运行 windowed-aggregation,然后输出到一个新的主题,你会从接收器连接器。