如何摆脱 "Closing the Kafka producer with timeoutMillis = ..."

How to get rid of "Closing the Kafka producer with timeoutMillis = ..."

我正在通过以下代码将 Apache Avro 格式的消息发送到 Kafka 代理实例:

        ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(kafkaTopic.getTopicName(), null, null,
                avroConverter.getSchemaId().toString(), convertRecordToByteArray(kafkaRecordToSend));

        String avroSchemaName = null;

        // some of my AVRO schemas are unions, some are simple:
        if (_avroSchema.getTypes().size() == 1) {
            avroSchemaName = _avroSchema.getTypes().get(0).getName();
        } else if (_avroSchema.getTypes().size() == 2) {
            avroSchemaName = _avroSchema.getTypes().get(1).getName();
        }

        // some custom header items...
        producerRecord.headers().add(MessageHeaders.MESSAGE_ID.getText(), messageID.getBytes());
        producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SUBJECT.getText(),
                avroSchemaName.getBytes());
        producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SCHEMA_ID.getText(),
                avroConverter.getSchemaId().toString().getBytes());
        if (multiline) {
            producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_MULTILINE_RECORD_NAME.getText(),
                    MULTILINE_RECORD_NAME.getBytes());
        }

        try {
            Future<RecordMetadata> result = kafkaProducer.send(producerRecord);
            RecordMetadata sendResult = result.get();
            MessageLogger.logResourceBundleMessage(_messages, "JAPCTOAVROKAFKAPRODUCER:DEBUG0002",
                    sendResult.offset());
        } catch (Exception e) {
            MessageLogger.logError(e);
            throw e;
        }

代码运行良好,消息最终进入 Kafka 并经过处理最终进入 InfluxDB。问题是每次发送操作都会产生很多INFO消息(客户端ID号就是一个例子):

哪个垃圾邮件我们的 Graylog。

我使用类似的代码发送 String 格式的消息。此代码在不产生 INFO 消息的情况下执行...

ProducerRecord<String, String> recordToSend = new ProducerRecord<>(queueName, messageText);
    recordToSend.headers().add("messageID", messageID.getBytes());

    Future<RecordMetadata> result = _producerConnection.send(recordToSend);
    

我知道 INFO 消息是从 class org.apache.kafka.clients.producer.KafkaProducer 记录的。我需要删除这些消息,但我无权访问 logging.mxl 定义 Graylog 的记录器属性。

有没有办法通过 POM 条目或以编程方式摆脱这些消息?

代码行为的原因是设计缺陷: 上面 post 中的代码被放置在一个方法中,该方法被调用以将消息发送到 Kafka。 KafkaProducer class 在该方法中以及每次调用该方法时实例化。令人惊讶的是, KafkaProducer 不仅在调用代码的显式 close() 时发出 Closing the KafkaProducer with timeoutMillis = ,而且在对实例的强引用丢失时(在我的例子中,当代码离开方法时) ,在后一种情况下,timeoutMillis设置为9223372036854775807(最大长数)。

为了摆脱许多消息,我将 KafkaProducer 实例化移出了方法,并使实例变量成为 class 属性,并且我不调用显式 close()send(...) 之后。

此外,我将 class 实例化 KafkaProducer 的实例更改为强引用 class 成员。

通过这样做,我在实例化时收到 KafkaProducer 的一些消息,然后就没有声音了。