Kafka Connect S3 Connector OutOfMemory 错误与 TimeBasedPartitioner
Kafka Connect S3 Connector OutOfMemory errors with TimeBasedPartitioner
我目前正在使用 Kafka Connect S3 Sink Connector 3.3.1 将 Kafka 消息复制到 S3,但在处理延迟数据时遇到 OutOfMemory 错误。
我知道这看起来是一个很长的问题,但我已尽力使其清晰易懂。
非常感谢您的帮助。
高级信息
- 连接器对 Kafka 消息进行简单的字节到字节的复制,并在字节数组的开头添加消息的长度(用于解压缩目的)。
- 这是
CustomByteArrayFormat
class 的作用(见下面的配置)
- 数据根据
Record
时间戳进行分区和分桶
CustomTimeBasedPartitioner
扩展了 io.confluent.connect.storage.partitioner.TimeBasedPartitioner
,其唯一目的是重写 generatePartitionedPath
方法以将主题放在路径的末尾。
- Kafka Connect进程的总堆大小为24GB(只有一个节点)
- 连接器每秒处理 8,000 到 10,000 条消息
- 每封邮件的大小接近 1 KB
- Kafka 主题有 32 个分区
OutOfMemory 错误的上下文
- 这些错误仅在连接器已关闭几个小时并且必须赶上数据时才会发生
- 重新打开连接器时,它开始赶上但很快失败并出现 OutOfMemory 错误
可能但不完整的解释
- 当这些 OOM 错误发生时,连接器的
timestamp.extractor
配置设置为 Record
- 将此配置切换为
Wallclock
(即 Kafka Connect 进程的时间)不要抛出 OOM 错误并且可以处理所有延迟数据,但延迟数据不再正确存储
- 所有迟到的数据都将在连接器重新打开时的
YYYY/MM/dd/HH/mm/topic-name
内存储
- 所以我的猜测是,当连接器尝试根据
Record
时间戳正确存储数据时,它会进行过多的并行读取,从而导致 OOM 错误
"partition.duration.ms": "600000"
参数使连接器桶数据在每小时六个 10 分钟路径中(2018/06/20/12/[00|10|20|30|40|50]
for 2018-06-20 at 12pm)
- 因此,对于 24 小时的延迟数据,连接器必须在
24h * 6 = 144
个不同的 S3 路径中输出数据。
- 每个 10 分钟的文件夹包含 10,000 messages/sec * 600 秒 = 6,000,000 条消息,大小为 6 GB
- 如果它确实是并行读取,那将使 864GB 的数据进入内存
- 我认为我必须正确配置一组给定的参数以避免那些 OOM 错误,但我觉得我看不到全局
"flush.size": "100000"
意味着如果读取了超过 100,000 条消息,则应将它们提交到文件(并因此释放内存)
- 对于 1KB 的消息,这意味着每 100MB 提交一次
- 但即使有 144 个并行读数,仍然只能提供 14.4 GB 的总容量,这小于可用的 24GB 堆大小
- 提交前
"flush.size"
每个分区要读取的记录数?或者可能 每个连接器的任务 ?
- 我对
"rotate.schedule.interval.ms": "600000"
配置的理解是,即使未达到 flush.size
的 100,000 条消息,数据也会每 10 分钟提交一次。
我的主要问题是允许我规划内存使用情况的数学是什么:
- 每秒记录数
- 记录的大小
- 我阅读的主题的Kafka分区数
- 连接器任务数(如果相关)
- 每小时写入的桶数(此处为 6,因为
"partition.duration.ms": "600000"
配置)
- 要处理的延迟数据的最大小时数
配置
S3 接收器连接器配置
{
"name": "xxxxxxx",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "600000",
"topics.dir": "xxxxx",
"flush.size": "100000",
"schema.compatibility": "NONE",
"topics": "xxxxxx,xxxxxx",
"tasks.max": "16",
"s3.part.size": "52428800",
"timezone": "UTC",
"locale": "en",
"format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
"partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "xxxxxxxxx",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "xxxxxxx",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH/mm",
"timestamp.extractor": "Record"
}
工作器配置
bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX
编辑:
我忘了添加一个错误示例:
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我终于能够理解堆大小的使用在 Kafka Connect S3 连接器中的工作原理
- S3 Connector会将每个Kafka分区的数据写入partitioned
paths
paths
的分区方式取决于 partitioner.class
参数;
- 默认是按时间戳,然后
partition.duration.ms
的值决定每个分区的时长paths
.
- S3 连接器将为每个 Kafka 分区(对于所有读取的主题)和每个分区
paths
分配 s3.part.size
字节的缓冲区
- 读取 20 个分区的示例,
timestamp.extractor
设置为 Record
,partition.duration.ms
设置为 1h,s3.part.size
设置为 50 MB
- 每小时所需的堆大小等于
20 * 50 MB
= 1 GB;
- 但是,
timestamp.extractor
被设置为 Record
,具有对应于较早时间戳的时间戳的消息将被缓冲在较早时间的缓冲区中。因此,实际上,连接器至少需要 20 * 50 MB * 2h
= 2 GB 的内存,因为总是有迟到的事件,如果有迟到超过 1 小时的事件,则需要更多;
- 请注意,如果
timestamp.extractor
设置为 Wallclock
,则情况并非如此,因为就 Kafka Connect 而言,几乎永远不会有延迟事件。
- 那些缓冲区在 3 种情况下被刷新(即离开内存)
rotate.schedule.interval.ms
时间过去了
- 此刷新条件总是触发。
rotate.interval.ms
时间已经过去timestamp.extractor
时间
- 这意味着如果
timestamp.extractor
设置为Record
,10分钟的Record
时间可以少或多和10分钟的实际时间
- 例如,在处理延迟数据时,10 分钟的数据将在几秒钟内处理完,如果
rotate.interval.ms
设置为 10 分钟,则此条件将每秒触发一次(应该如此) ;
- 相反,如果事件流暂停,则此条件不会触发,直到它看到时间戳显示自条件上次触发以来已过去
rotate.interval.ms
的事件。
flush.size
条消息已在不到 min(rotate.schedule.interval.ms
、rotate.interval.ms)
的时间内阅读
- 至于
rotate.interval.ms
,如果没有足够的消息,此条件可能永远不会触发。
- 因此,您至少需要规划
Kafka partitions * s3.part.size
堆大小
- 如果您使用
Record
时间戳进行分区,则应将其乘以 max lateness in milliseconds / partition.duration.ms
- 这是最坏的情况,您在所有分区和
max lateness in milliseconds
的所有范围内不断有延迟事件。
- S3 连接器在从 Kafka 读取数据时还将为每个分区缓冲
consumer.max.partition.fetch.bytes
字节
- 默认设置为 2.1 MB。
- 最后,你不应该考虑所有的Heap Size都可以用来缓冲Kafka消息,因为里面还有很多不同的对象
- 一个安全的考虑是确保 Kafka 消息的缓冲不超过总可用堆大小的 50%。
@raphael 完美地解释了工作原理。
粘贴我遇到过的类似问题的一个小变体(事件太少,无法处理,但跨越很多 hours/days)。
在我的例子中,我有大约 150 个连接器,其中 8 个因 OOM 而失败,因为它们必须处理大约 7 天的数据(我们在测试环境中的 kafka 停机了大约 2 周)
遵循的步骤:
- 所有连接器的
s3.part.size
从 25MB 减少到 5MB。 (在我们的场景中,rotate.interval
设置为 10 分钟,flush.size
设置为 10000。我们的大多数事件应该很容易适应这个限制。
- 此设置后,只有一个连接器仍然出现 OOM,并且这个连接器在启动后 5 秒 内进入 OOM(基于堆分析),堆利用率从 200MB-1.5GB 猛增。在查看 kafka 偏移延迟时,在所有 7 天 中只有 8K 个事件 需要处理。所以这不是因为要处理的事件太多,而是因为要处理的事件太少/flush。
- 由于我们使用的是每小时分区并且一个小时内几乎没有 100 个事件,因此这 7 天的所有缓冲区都是在没有刷新的情况下创建的(没有被释放到 JVM)-
7 * 24 * 5MB * 3 partitions = 2.5GB
(xmx- 1.5GB)
修复:
执行以下步骤之一,直到您的连接器赶上,然后恢复您的旧配置。 (推荐方法 - 1)
- 更新连接器配置以处理 100 或 1000 条记录
flush.size
(取决于您的数据结构)。
缺点: 如果实际事件超过 1000,一小时内会创建太多小文件。
- 将分区更改为每日,这样就只有每日分区了。
缺点: 现在您的 S3 中混合了每小时和每日分区。
我目前正在使用 Kafka Connect S3 Sink Connector 3.3.1 将 Kafka 消息复制到 S3,但在处理延迟数据时遇到 OutOfMemory 错误。
我知道这看起来是一个很长的问题,但我已尽力使其清晰易懂。 非常感谢您的帮助。
高级信息
- 连接器对 Kafka 消息进行简单的字节到字节的复制,并在字节数组的开头添加消息的长度(用于解压缩目的)。
- 这是
CustomByteArrayFormat
class 的作用(见下面的配置)
- 这是
- 数据根据
Record
时间戳进行分区和分桶CustomTimeBasedPartitioner
扩展了io.confluent.connect.storage.partitioner.TimeBasedPartitioner
,其唯一目的是重写generatePartitionedPath
方法以将主题放在路径的末尾。
- Kafka Connect进程的总堆大小为24GB(只有一个节点)
- 连接器每秒处理 8,000 到 10,000 条消息
- 每封邮件的大小接近 1 KB
- Kafka 主题有 32 个分区
OutOfMemory 错误的上下文
- 这些错误仅在连接器已关闭几个小时并且必须赶上数据时才会发生
- 重新打开连接器时,它开始赶上但很快失败并出现 OutOfMemory 错误
可能但不完整的解释
- 当这些 OOM 错误发生时,连接器的
timestamp.extractor
配置设置为Record
- 将此配置切换为
Wallclock
(即 Kafka Connect 进程的时间)不要抛出 OOM 错误并且可以处理所有延迟数据,但延迟数据不再正确存储- 所有迟到的数据都将在连接器重新打开时的
YYYY/MM/dd/HH/mm/topic-name
内存储
- 所有迟到的数据都将在连接器重新打开时的
- 所以我的猜测是,当连接器尝试根据
Record
时间戳正确存储数据时,它会进行过多的并行读取,从而导致 OOM 错误"partition.duration.ms": "600000"
参数使连接器桶数据在每小时六个 10 分钟路径中(2018/06/20/12/[00|10|20|30|40|50]
for 2018-06-20 at 12pm)- 因此,对于 24 小时的延迟数据,连接器必须在
24h * 6 = 144
个不同的 S3 路径中输出数据。 - 每个 10 分钟的文件夹包含 10,000 messages/sec * 600 秒 = 6,000,000 条消息,大小为 6 GB
- 如果它确实是并行读取,那将使 864GB 的数据进入内存
- 我认为我必须正确配置一组给定的参数以避免那些 OOM 错误,但我觉得我看不到全局
"flush.size": "100000"
意味着如果读取了超过 100,000 条消息,则应将它们提交到文件(并因此释放内存)- 对于 1KB 的消息,这意味着每 100MB 提交一次
- 但即使有 144 个并行读数,仍然只能提供 14.4 GB 的总容量,这小于可用的 24GB 堆大小
- 提交前
"flush.size"
每个分区要读取的记录数?或者可能 每个连接器的任务 ?
- 我对
"rotate.schedule.interval.ms": "600000"
配置的理解是,即使未达到flush.size
的 100,000 条消息,数据也会每 10 分钟提交一次。
我的主要问题是允许我规划内存使用情况的数学是什么:
- 每秒记录数
- 记录的大小
- 我阅读的主题的Kafka分区数
- 连接器任务数(如果相关)
- 每小时写入的桶数(此处为 6,因为
"partition.duration.ms": "600000"
配置) - 要处理的延迟数据的最大小时数
配置
S3 接收器连接器配置
{
"name": "xxxxxxx",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "600000",
"topics.dir": "xxxxx",
"flush.size": "100000",
"schema.compatibility": "NONE",
"topics": "xxxxxx,xxxxxx",
"tasks.max": "16",
"s3.part.size": "52428800",
"timezone": "UTC",
"locale": "en",
"format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
"partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "xxxxxxxxx",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "xxxxxxx",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH/mm",
"timestamp.extractor": "Record"
}
工作器配置
bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX
编辑:
我忘了添加一个错误示例:
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
我终于能够理解堆大小的使用在 Kafka Connect S3 连接器中的工作原理
- S3 Connector会将每个Kafka分区的数据写入partitioned
paths
paths
的分区方式取决于partitioner.class
参数;- 默认是按时间戳,然后
partition.duration.ms
的值决定每个分区的时长paths
.
- S3 连接器将为每个 Kafka 分区(对于所有读取的主题)和每个分区
paths
分配s3.part.size
字节的缓冲区- 读取 20 个分区的示例,
timestamp.extractor
设置为Record
,partition.duration.ms
设置为 1h,s3.part.size
设置为 50 MB- 每小时所需的堆大小等于
20 * 50 MB
= 1 GB; - 但是,
timestamp.extractor
被设置为Record
,具有对应于较早时间戳的时间戳的消息将被缓冲在较早时间的缓冲区中。因此,实际上,连接器至少需要20 * 50 MB * 2h
= 2 GB 的内存,因为总是有迟到的事件,如果有迟到超过 1 小时的事件,则需要更多; - 请注意,如果
timestamp.extractor
设置为Wallclock
,则情况并非如此,因为就 Kafka Connect 而言,几乎永远不会有延迟事件。
- 每小时所需的堆大小等于
- 那些缓冲区在 3 种情况下被刷新(即离开内存)
rotate.schedule.interval.ms
时间过去了- 此刷新条件总是触发。
rotate.interval.ms
时间已经过去timestamp.extractor
时间- 这意味着如果
timestamp.extractor
设置为Record
,10分钟的Record
时间可以少或多和10分钟的实际时间- 例如,在处理延迟数据时,10 分钟的数据将在几秒钟内处理完,如果
rotate.interval.ms
设置为 10 分钟,则此条件将每秒触发一次(应该如此) ; - 相反,如果事件流暂停,则此条件不会触发,直到它看到时间戳显示自条件上次触发以来已过去
rotate.interval.ms
的事件。
- 例如,在处理延迟数据时,10 分钟的数据将在几秒钟内处理完,如果
- 这意味着如果
flush.size
条消息已在不到min(rotate.schedule.interval.ms
、rotate.interval.ms)
的时间内阅读- 至于
rotate.interval.ms
,如果没有足够的消息,此条件可能永远不会触发。
- 至于
- 因此,您至少需要规划
Kafka partitions * s3.part.size
堆大小- 如果您使用
Record
时间戳进行分区,则应将其乘以max lateness in milliseconds / partition.duration.ms
- 这是最坏的情况,您在所有分区和
max lateness in milliseconds
的所有范围内不断有延迟事件。
- 这是最坏的情况,您在所有分区和
- 如果您使用
- 读取 20 个分区的示例,
- S3 连接器在从 Kafka 读取数据时还将为每个分区缓冲
consumer.max.partition.fetch.bytes
字节- 默认设置为 2.1 MB。
- 最后,你不应该考虑所有的Heap Size都可以用来缓冲Kafka消息,因为里面还有很多不同的对象
- 一个安全的考虑是确保 Kafka 消息的缓冲不超过总可用堆大小的 50%。
@raphael 完美地解释了工作原理。
粘贴我遇到过的类似问题的一个小变体(事件太少,无法处理,但跨越很多 hours/days)。
在我的例子中,我有大约 150 个连接器,其中 8 个因 OOM 而失败,因为它们必须处理大约 7 天的数据(我们在测试环境中的 kafka 停机了大约 2 周)
遵循的步骤:
- 所有连接器的
s3.part.size
从 25MB 减少到 5MB。 (在我们的场景中,rotate.interval
设置为 10 分钟,flush.size
设置为 10000。我们的大多数事件应该很容易适应这个限制。 - 此设置后,只有一个连接器仍然出现 OOM,并且这个连接器在启动后 5 秒 内进入 OOM(基于堆分析),堆利用率从 200MB-1.5GB 猛增。在查看 kafka 偏移延迟时,在所有 7 天 中只有 8K 个事件 需要处理。所以这不是因为要处理的事件太多,而是因为要处理的事件太少/flush。
- 由于我们使用的是每小时分区并且一个小时内几乎没有 100 个事件,因此这 7 天的所有缓冲区都是在没有刷新的情况下创建的(没有被释放到 JVM)-
7 * 24 * 5MB * 3 partitions = 2.5GB
(xmx- 1.5GB)
修复: 执行以下步骤之一,直到您的连接器赶上,然后恢复您的旧配置。 (推荐方法 - 1)
- 更新连接器配置以处理 100 或 1000 条记录
flush.size
(取决于您的数据结构)。
缺点: 如果实际事件超过 1000,一小时内会创建太多小文件。 - 将分区更改为每日,这样就只有每日分区了。
缺点: 现在您的 S3 中混合了每小时和每日分区。