S3 Kafka 接收器连接器中有没有办法确保所有记录都被消耗

Is there a way in the S3 Kafka sink connector to ensure all records are consumed

我在 S3 Kafka 连接器中遇到问题,但在 JDBC 连接器中也看到了这个问题。 我正在尝试查看如何确保我的连接器实际使用特定主题中的所有数据。 我预计由于刷新大小,消息的消费可能会有一定的延迟(10/15 分钟),但我注意到我最终有很大的延迟(几天......)并且我的消费者总是有一些东西滞后偏移

我是 reading/viewing post/video 例如(主要是那个评论):https://rmoff.net/2020/12/08/twelve-days-of-smt-day-1-insertfield-timestamp/ https://github.com/confluentinc/demo-scene/blob/master/kafka-connect-single-message-transforms/day1.adoc “flush.size of 16 太低了,但如果它太高,你必须等待你的文件出现在 S3 中,我等得无聊。”

如果 flush.size 大于可用记录,它确实提到了记录可能需要时间来消耗,但我从没想过这会超过几分钟。 我怎样才能确保所有记录都被消耗掉,我真的很想避免 flush.size = 1

也许这只是我对接收器连接器的误解,但我确实希望它们能像普通消费者一样工作,所以我希望它们能消耗所有数据,而且这种 flush/batch 大小会根据超时和性能问题更有效。

如果有人感兴趣,这是我的连接器配置

对于 S3 接收器:

topics.regex: com.custom.obj_(.*)
storage.class: io.confluent.connect.s3.storage.S3Storage
s3.region: ${@S3_REGION@}
s3.bucket.name: ${@S3_BUCKET@}
topics.dir: ${@S3_OBJ_TOPICS_DIR@}
flush.size: 200
rotate.interval.ms: 20000
auto.register.schemas: false
s3.part.size: 5242880
parquet.codec: snappy
offset.flush.interval.ms: 20000
offset.flush.timeout.ms: 5000
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
format.class: com.custom.connect.s3.format.parquet.ParquetFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
timestamp.extractor: Record
locale: ${@S3_LOCALE@}
timezone: ${@S3_TIMEZONE@}
store.url: ${@S3_STORAGE_URL@}
connect.meta.data: false
transforms: kafkaMetaData,formatTs
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
transforms.formatTs.field: message_ts
transforms.formatTs.target.type: string
transforms.formatTs.type: org.apache.kafka.connect.transforms.TimestampConverter$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true

对于JDBC接收器:

topics.regex: com.custom.obj_(.*)
table.name.format: ${@PREFIX@}${topic}
batch.size: 200
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
connection.url: ${@DB_URL@}
connection.user: ${@DB_USER@}
connection.password: ${@DB_PASSWORD@}
auto.create: true
auto.evolve: true
db.timezone: ${@DB_TIMEZONE@}
quote.sql.identifiers: never
transforms: kafkaMetaData
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true

我已经阅读了这两篇文章,但仍然不确定: Kafka JDBC Sink Connector, insert values in batches https://github.com/confluentinc/kafka-connect-jdbc/issues/290

另外,例如,我看到了人们使用的示例(我认为这对我的用例没有帮助),但我想知道这个值是按连接器定义的吗? 我什至有点困惑,因为在文档中我总是找到没有消费者的配置。但我总是在消费者那里找到例子。所以我想这意味着这是一个通用的 属性 既适用于消费者又适用于生产者?

consumer.max.interval.ms: 300000
consumer.max.poll.records: 200

有人有什么好的反馈吗?

关于提供的 Kafka S3 接收器连接器配置:

topics.regex: com.custom.obj_(.*)
storage.class: io.confluent.connect.s3.storage.S3Storage
s3.region: ${@S3_REGION@}
s3.bucket.name: ${@S3_BUCKET@}
topics.dir: ${@S3_OBJ_TOPICS_DIR@}
flush.size: 200
rotate.interval.ms: 20000
auto.register.schemas: false
s3.part.size: 5242880
parquet.codec: snappy
offset.flush.interval.ms: 20000
offset.flush.timeout.ms: 5000
aws.access.key.id: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:accesskey}
aws.secret.access.key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:secretkey}
format.class: com.custom.connect.s3.format.parquet.ParquetFormat
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: com.custom.insight.connect.protobuf.ProtobufConverter
partitioner.class: io.confluent.connect.storage.partitioner.DailyPartitioner
timestamp.extractor: Record
locale: ${@S3_LOCALE@}
timezone: ${@S3_TIMEZONE@}
store.url: ${@S3_STORAGE_URL@}
connect.meta.data: false
transforms: kafkaMetaData,formatTs
transforms.kafkaMetaData.type: org.apache.kafka.connect.transforms.InsertField$Value
transforms.kafkaMetaData.offset.field: kafka_offset
transforms.kafkaMetaData.partition.field: kafka_partition
transforms.kafkaMetaData.timestamp.field: kafka_timestamp
transforms.formatTs.format: yyyy-MM-dd HH:mm:ss:SSS
transforms.formatTs.field: message_ts
transforms.formatTs.target.type: string
transforms.formatTs.type:org.apache.kafka.connect.transforms.TimestampConverter$Value
errors.tolerance: all
errors.deadletterqueue.topic.name: ${@DLQ_STORAGE_TOPIC@}
errors.deadletterqueue.context.headers.enable: true

您可以调整配置字段以控制 consumption\upload 到 S3 的速率。从而减少您看到的 Kafka 偏移量的滞后。最佳做法是在您的配置中为以下字段使用变量。

根据个人经验,您可以进行的调整有:

  1. 调整flush.size

    flush.size: 800

这是(如您所述):

Maximum number of records: The connector’s flush.size configuration property specifies the maximum number of records that should be written to a single S3 object. There is no default for this setting.

我更喜欢更大的文件并使用下面的时间调整来控制消耗。由于 flush.size * RECORD_SIZE.

,请确保您的记录不会太大或太小而无法制作合理的文件
  1. 调整rotate.interval.ms

    rotate.interval.ms: (i would delete this field, see rotate.schedule explanation below)

即:

Maximum span of record time: The connector’s rotate.interval.ms specifies the maximum timespan in milliseconds a file can remain open and ready for additional records.

  1. 添加字段rotate.schedule.interval.ms:

    rotate.schedule.interval.ms 60000

即:

Scheduled rotation: The connector’s rotate.schedule.interval.ms specifies the maximum timespan in milliseconds a file can remain open and ready for additional records. Unlike with rotate.interval.ms, with scheduled rotation the timestamp for each file starts with the system time that the first record is written to the file. As long as a record is processed within the timespan specified by rotate.schedule.interval.ms, the record will be written to the file. As soon as a record is processed after the timespan for the current file, the file is flushed, uploaded to S3, and the offset of the records in the file are committed. A new file is created with a timespan that starts with the current system time, and the record is written to the file. The commit will be performed at the scheduled time, regardless of the previous commit time or number of messages. This configuration is useful when you have to commit your data based on current server time, for example at the beginning of every hour. The default value -1 means that this feature is disabled.

您使用默认值 -1,这意味着禁用此旋转。此调整将产生最大的不同,因为每个任务都会更频繁地消耗。

关于问题的第二部分:

您可以通过向您的 kafka 添加指标并使用例如 prometheus 和 grafana 进行连接来获得可观察性。下面的源配置指南。

来源:

Connect S3 sink

kafka-monitoring-via-prometheus

Connect S3 Sink config Docs