Kafka Connector Sink 任务中的 Put() 与 Flush()

Put() vs Flush() in Kafka Connector Sink Task

我正在尝试使用 Kafka Sink Connector 将数据批量发送到 NOSQL 数据库。我正在关注 https://kafka.apache.org/documentation/#connect 文档,并对必须在何处实现发送记录的逻辑感到困惑。请帮助我了解内部如何处理记录以及必须使用 Put() 或 Flush() 来批量处理记录。

当 Kafka Connect worker 是 运行 接收器任务时,它将使用分配给该任务的主题分区中的消息。这样做时,它通过 put(Collection<SinkRecord>) 方法重复将一批消息传递给接收器任务。只要连接器及其任务是 运行.

,这种情况就会继续

Kafka Connect也会周期性记录sink任务的进度,即最近处理的消息在每个topic分区上的偏移量。这称为 提交偏移量 ,这样做是为了在连接器意外和不干净地停止时,Kafka Connect 知道任务应该在每个主题分区中的哪个位置恢复处理消息。但是就在 Kafka Connect 将偏移量写入 Kafka 之前,Kafka Connect worker 通过 flush(...) 方法为 sink connector 提供了在此阶段工作的机会。

一个特定的接收器连接器可能不需要做任何事情(如果 put(...) 做了所有的工作),或者它可能会利用这个机会提交所有已经通过 put(...) 处理的消息到数据存储。例如,另一方面,Confluent's JDBC sink connector writes each batch of messages passed through the put(...) method using a transaction (the size of which can be controlled via the connector's consumer settings), and thus the flush(...) method doesn't need to do anything. Confluent's ElasticSearch sink connector 简单地累积了一系列 put(...) 方法的所有消息,并且仅在 flush(...).

期间将它们写入 Elasticsearch

为源连接器和接收器连接器提交偏移的频率由连接器的 offset.flush.interval.ms 配置 属性 控制。默认设置是每 60 秒提交一次偏移量,这种频率不足以提高性能和减少开销,但足够频繁以限制连接器任务意外终止时潜在的重新处理量。请注意,当连接器正常关闭或遇到异常时,Kafka Connect 将始终有机会提交偏移量。只有当 Kafka Connect worker 意外终止时,它可能没有机会提交标识已处理的消息的偏移量。因此,只有在发生此类故障后重新启动后,连接器才有可能重新处理它在故障之前所做的一些消息。这是因为消息可能至少被看到一次,所以消息应该是幂等的。在确定此设置的适当值时,将所有这些加上您的连接器的行为考虑在内。

查看 Confluent documentation for Kafka Connect 以及开源接收器连接器以获取更多示例和详细信息。