Kafka Connect S3 Connector OutOfMemory 错误与 TimeBasedPartitioner

Kafka Connect S3 Connector OutOfMemory errors with TimeBasedPartitioner

我目前正在使用 Kafka Connect S3 Sink Connector 3.3.1 将 Kafka 消息复制到 S3,但在处理延迟数据时遇到 OutOfMemory 错误。

我知道这看起来是一个很长的问题,但我已尽力使其清晰易懂。 非常感谢您的帮助。

高级信息

OutOfMemory 错误的上下文

可能但不完整的解释

我的主要问题是允许我规划内存使用情况的数学是什么:

  • 每秒记录数
  • 记录的大小
  • 我阅读的主题的Kafka分区数
  • 连接器任务数(如果相关)
  • 每小时写入的桶数(此处为 6,因为 "partition.duration.ms": "600000" 配置)
  • 要处理的延迟数据的最大小时数

配置

S3 接收器连接器配置

{
  "name": "xxxxxxx",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "us-east-1",
    "partition.duration.ms": "600000",
    "topics.dir": "xxxxx",
    "flush.size": "100000",
    "schema.compatibility": "NONE",
    "topics": "xxxxxx,xxxxxx",
    "tasks.max": "16",
    "s3.part.size": "52428800",
    "timezone": "UTC",
    "locale": "en",
    "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
    "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "xxxxxxxxx",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "xxxxxxx",
    "rotate.schedule.interval.ms": "600000",
    "path.format": "YYYY/MM/dd/HH/mm",
    "timestamp.extractor": "Record"
}

工作器配置

bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX

编辑:

我忘了添加一个错误示例:

2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我终于能够理解堆大小的使用在 Kafka Connect S3 连接器中的工作原理

  • S3 Connector会将每个Kafka分区的数据写入partitioned paths
    • paths 的分区方式取决于 partitioner.class 参数;
    • 默认是按时间戳,然后partition.duration.ms的值决定每个分区的时长paths.
  • S3 连接器将为每个 Kafka 分区(对于所有读取的主题)和每个分区 paths 分配 s3.part.size 字节的缓冲区
    • 读取 20 个分区的示例,timestamp.extractor 设置为 Recordpartition.duration.ms 设置为 1h,s3.part.size 设置为 50 MB
      • 每小时所需的堆大小等于 20 * 50 MB = 1 GB;
      • 但是,timestamp.extractor 被设置为 Record,具有对应于较早时间戳的时间戳的消息将被缓冲在较早时间的缓冲区中。因此,实际上,连接器至少需要 20 * 50 MB * 2h = 2 GB 的内存,因为总是有迟到的事件,如果有迟到超过 1 小时的事件,则需要更多;
      • 请注意,如果 timestamp.extractor 设置为 Wallclock,则情况并非如此,因为就 Kafka Connect 而言,几乎永远不会有延迟事件。
    • 那些缓冲区在 3 种情况下被刷新(即离开内存)
      • rotate.schedule.interval.ms时间过去了
        • 此刷新条件总是触发。
      • rotate.interval.ms时间已经过去timestamp.extractor时间
        • 这意味着如果timestamp.extractor设置为Record,10分钟的Record时间可以少或多和10分钟的实际时间
          • 例如,在处理延迟数据时,10 分钟的数据将在几秒钟内处理完,如果 rotate.interval.ms 设置为 10 分钟,则此条件将每秒触发一次(应该如此) ;
          • 相反,如果事件流暂停,则此条件不会触发,直到它看到时间戳显示自条件上次触发以来已过去 rotate.interval.ms 的事件。
      • flush.size 条消息已在不到 min(rotate.schedule.interval.msrotate.interval.ms) 的时间内阅读
        • 至于rotate.interval.ms,如果没有足够的消息,此条件可能永远不会触发。
    • 因此,您至少需要规划 Kafka partitions * s3.part.size 堆大小
      • 如果您使用 Record 时间戳进行分区,则应将其乘以 max lateness in milliseconds / partition.duration.ms
        • 这是最坏的情况,您在所有分区和 max lateness in milliseconds 的所有范围内不断有延迟事件。
  • S3 连接器在从 Kafka 读取数据时还将为每个分区缓冲 consumer.max.partition.fetch.bytes 字节
    • 默认设置为 2.1 MB。
  • 最后,你不应该考虑所有的Heap Size都可以用来缓冲Kafka消息,因为里面还有很多不同的对象
    • 一个安全的考虑是确保 Kafka 消息的缓冲不超过总可用堆大小的 50%。

@raphael 完美地解释了工作原理。
粘贴我遇到过的类似问题的一个小变体(事件太少,无法处理,但跨越很多 hours/days)。

在我的例子中,我有大约 150 个连接器,其中 8 个因 OOM 而失败,因为它们必须处理大约 7 天的数据(我们在测试环境中的 kafka 停机了大约 2 周)

遵循的步骤:

  1. 所有连接器的 s3.part.size 从 25MB 减少到 5MB。 (在我们的场景中,rotate.interval 设置为 10 分钟,flush.size 设置为 10000。我们的大多数事件应该很容易适应这个限制。
  2. 此设置后,只有一个连接器仍然出现 OOM,并且这个连接器在启动后 5 秒 内进入 OOM(基于堆分析),堆利用率从 200MB-1.5GB 猛增。在查看 kafka 偏移延迟时,在所有 7 天 中只有 8K 个事件 需要处理。所以这不是因为要处理的事件太多,而是因为要处理的事件太少/flush。
  3. 由于我们使用的是每小时分区并且一个小时内几乎没有 100 个事件,因此这 7 天的所有缓冲区都是在没有刷新的情况下创建的(没有被释放到 JVM)-7 * 24 * 5MB * 3 partitions = 2.5GB(xmx- 1.5GB)

修复: 执行以下步骤之一,直到您的连接器赶上,然后恢复您的旧配置。 (推荐方法 - 1

  1. 更新连接器配置以处理 100 或 1000 条记录flush.size(取决于您的数据结构)。
    缺点: 如果实际事件超过 1000,一小时内会创建太多小文件。
  2. 将分区更改为每日,这样就只有每日分区了。
    缺点: 现在您的 S3 中混合了每小时和每日分区。