Kafka Connector Record Writer 因缺少内存分配而卡在 S3OutputStream 中,但不会在数小时内保持空闲状态

Kafka Connector Record Writer gets stuck in S3OutputStream for lack of memory to allocate but does not fail staying idle for hours

我有一个我不知道如何修改的行为。 我正在测试 s3 Kafka 同步连接器,我的主题中的数据很少。

目前我可以使用 Kafka 管理器看到主题中有数据,但我的连接器读取数据并且从不移动偏移量,也从不将其推送到 Kafka。 这在其他主题中有效,但在这个特定主题中无效。 我认为与超时有关,但我找不到要设置的正确配置 属性,以便刷新速度更快一些。

这是我的配置:

      curl -X PUT -s -o /dev/null -H ""Content-Type:application/json""
      http://localhost:$$CONNECT_REST_PORT/connectors/s3_connector_doc_cmg/config
      \
        -d '{
          ""connector.class"": ""io.confluent.connect.s3.S3SinkConnector"",
          ""storage.class"": ""io.confluent.connect.s3.storage.S3Storage"",
          ""s3.region"": ""us-east-1"",
          ""s3.bucket.name"": ""confluent-pipeline"",
          ""topics.dir"": ""topics"",
          ""topics"": ""com.acp.bde.doc_cmg"",
          ""flush.size"": ""25"",
          ""rotate.interval.ms"": ""5000"",
          ""auto.register.schemas"": ""false"",
          ""tasks.max"": ""1"",
          ""s3.part.size"": ""5242880"",
          ""timezone"": ""UTC"",
          ""parquet.codec"": ""snappy"",
          ""offset.flush.interval.ms"": ""5000"",
          ""offset.flush.timeout.ms"": ""1000"",
          ""s3.credentials.provider.class"": ""com.amazonaws.auth.DefaultAWSCredentialsProviderChain"",
          ""format.class"": ""io.confluent.connect.s3.format.avro.AvroFormat"",
          ""value.converter"": ""com.insight.connect.protobuf.ProtobufConverter"",
          ""key.converter"": ""org.apache.kafka.connect.storage.StringConverter"",
          ""partitioner.class"": ""io.confluent.connect.storage.partitioner.DailyPartitioner"",
          ""locale"": ""de-CH"",
          ""timezone"": ""Europe/Zurich"",
          ""store.url"": ""http://minio-server-svc:9000/""
        }'"

这是我在日志中看到的:

[2020-10-23 10:35:47,594] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+1+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,017] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+3+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 10:35:48,075] INFO Opening record writer for: topics/com.acp.bde.doc_cmg/year=2020/month=10/day=23/com.acp.bde.doc_cmg+2+0000000000.avro (io.confluent.connect.s3.format.avro.AvroRecordWriterProvider)
[2020-10-23 11:35:37,989] INFO [Worker clientId=connect-1, groupId=kafka-connect-01] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

所以它们已经打开了将近 1 小时,但实际上什么也没发生,我想知道我的配置是否完全糟糕,或者我需要一些 属性 和配置,所以这种数据推送是一个快一点。

更新: 我仍然没有正确修复这个问题,但实际上它似乎是内存不足的问题。

程序卡在这一行 this.buffer = ByteBuffer.allocate(this.partSize);

https://github.com/confluentinc/kafka-connect-storage-cloud/blob/master/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java#L85

困扰我的部分是它根本不抱怨,只是停留在那里。它不应该因内存不足问题而崩溃吗? 还是不应该更快地释放内存? 它几乎可以在那个通话中停留超过 3 或 4 小时而没有任何反馈。

我仍然认为我的配置可能有问题,但我不知道我应该查看什么或在哪里。

您的分区程序是基于时间的。所以这可能是由于 rotate.schedule.interval.ms 参数不存在时的行为造成的。看看下面的话题