处理 Kafka S3 连接器中的延迟
Handle lags in Kafka S3 Connector
我们正在使用 Kafka Connect [分布式,融合 4.0]。
它工作得很好,除了连接器侦听的主题中始终保留未提交的消息。该行为可能与 S3 连接器 配置 "flush.size": "20000"
有关。主题中的滞后总是低于刷新大小。
我们的数据是分批来的,我不想等到下一批到达,也不想减少 flush.size
和创建大量文件。
是否可以设置超时,即使没有达到 20000 个事件,S3 连接器也会刷新数据?
谢谢!
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "event",
"tasks.max": "3",
"topics.dir": "connect",
"s3.region": "some_region",
"s3.bucket.name": "some_bucket",
"s3.part.size": "5242880",
"flush.size": "20000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "FULL",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'\''day_ts'\''=YYYYMMdd/'\''hour_ts'\''=H",
"partition.duration.ms": "3600000",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "time"
}
}
要使用 S3 连接器定期刷新关于低容量主题的未完成记录,您可以使用配置 属性:
rotate.schedule.interval.ms
(完整的配置列表 here)
请记住,通过使用上面的 属性,您可能会在重新处理或从错误中恢复时看到重复的消息,无论您使用的是哪个分区程序。
我们正在使用 Kafka Connect [分布式,融合 4.0]。
它工作得很好,除了连接器侦听的主题中始终保留未提交的消息。该行为可能与 S3 连接器 配置 "flush.size": "20000"
有关。主题中的滞后总是低于刷新大小。
我们的数据是分批来的,我不想等到下一批到达,也不想减少 flush.size
和创建大量文件。
是否可以设置超时,即使没有达到 20000 个事件,S3 连接器也会刷新数据?
谢谢!
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"topics": "event",
"tasks.max": "3",
"topics.dir": "connect",
"s3.region": "some_region",
"s3.bucket.name": "some_bucket",
"s3.part.size": "5242880",
"flush.size": "20000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility": "FULL",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'\''day_ts'\''=YYYYMMdd/'\''hour_ts'\''=H",
"partition.duration.ms": "3600000",
"locale": "en_US",
"timezone": "UTC",
"timestamp.extractor": "RecordField",
"timestamp.field": "time"
}
}
要使用 S3 连接器定期刷新关于低容量主题的未完成记录,您可以使用配置 属性:
rotate.schedule.interval.ms
(完整的配置列表 here)
请记住,通过使用上面的 属性,您可能会在重新处理或从错误中恢复时看到重复的消息,无论您使用的是哪个分区程序。