如何摆脱 "Closing the Kafka producer with timeoutMillis = ..."
How to get rid of "Closing the Kafka producer with timeoutMillis = ..."
我正在通过以下代码将 Apache Avro 格式的消息发送到 Kafka 代理实例:
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(kafkaTopic.getTopicName(), null, null,
avroConverter.getSchemaId().toString(), convertRecordToByteArray(kafkaRecordToSend));
String avroSchemaName = null;
// some of my AVRO schemas are unions, some are simple:
if (_avroSchema.getTypes().size() == 1) {
avroSchemaName = _avroSchema.getTypes().get(0).getName();
} else if (_avroSchema.getTypes().size() == 2) {
avroSchemaName = _avroSchema.getTypes().get(1).getName();
}
// some custom header items...
producerRecord.headers().add(MessageHeaders.MESSAGE_ID.getText(), messageID.getBytes());
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SUBJECT.getText(),
avroSchemaName.getBytes());
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SCHEMA_ID.getText(),
avroConverter.getSchemaId().toString().getBytes());
if (multiline) {
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_MULTILINE_RECORD_NAME.getText(),
MULTILINE_RECORD_NAME.getBytes());
}
try {
Future<RecordMetadata> result = kafkaProducer.send(producerRecord);
RecordMetadata sendResult = result.get();
MessageLogger.logResourceBundleMessage(_messages, "JAPCTOAVROKAFKAPRODUCER:DEBUG0002",
sendResult.offset());
} catch (Exception e) {
MessageLogger.logError(e);
throw e;
}
代码运行良好,消息最终进入 Kafka 并经过处理最终进入 InfluxDB。问题是每次发送操作都会产生很多INFO消息(客户端ID号就是一个例子):
- [Producer clientId=producer-27902] 关闭 timeoutMillis = 10000 ms 的 Kafka 生产者。
- [Producer clientId=producer-27902] 关闭 Kafka producer timeoutMillis = 9223372036854775807 ms.
- Kafka startTimeMs: ...
- Kafka commitId:...
- [Producer clientId=producer-27902]集群ID:
哪个垃圾邮件我们的 Graylog。
我使用类似的代码发送 String 格式的消息。此代码在不产生 INFO 消息的情况下执行...
ProducerRecord<String, String> recordToSend = new ProducerRecord<>(queueName, messageText);
recordToSend.headers().add("messageID", messageID.getBytes());
Future<RecordMetadata> result = _producerConnection.send(recordToSend);
我知道 INFO 消息是从 class org.apache.kafka.clients.producer.KafkaProducer
记录的。我需要删除这些消息,但我无权访问 logging.mxl 定义 Graylog 的记录器属性。
有没有办法通过 POM 条目或以编程方式摆脱这些消息?
代码行为的原因是设计缺陷:
上面 post 中的代码被放置在一个方法中,该方法被调用以将消息发送到 Kafka。 KafkaProducer
class 在该方法中以及每次调用该方法时实例化。令人惊讶的是, KafkaProducer
不仅在调用代码的显式 close() 时发出 Closing the KafkaProducer with timeoutMillis =
,而且在对实例的强引用丢失时(在我的例子中,当代码离开方法时) ,在后一种情况下,timeoutMillis
设置为9223372036854775807(最大长数)。
为了摆脱许多消息,我将 KafkaProducer
实例化移出了方法,并使实例变量成为 class 属性,并且我不调用显式 close()
在 send(...)
之后。
此外,我将 class 实例化 KafkaProducer
的实例更改为强引用 class 成员。
通过这样做,我在实例化时收到 KafkaProducer
的一些消息,然后就没有声音了。
我正在通过以下代码将 Apache Avro 格式的消息发送到 Kafka 代理实例:
ProducerRecord<String, byte[]> producerRecord = new ProducerRecord<>(kafkaTopic.getTopicName(), null, null,
avroConverter.getSchemaId().toString(), convertRecordToByteArray(kafkaRecordToSend));
String avroSchemaName = null;
// some of my AVRO schemas are unions, some are simple:
if (_avroSchema.getTypes().size() == 1) {
avroSchemaName = _avroSchema.getTypes().get(0).getName();
} else if (_avroSchema.getTypes().size() == 2) {
avroSchemaName = _avroSchema.getTypes().get(1).getName();
}
// some custom header items...
producerRecord.headers().add(MessageHeaders.MESSAGE_ID.getText(), messageID.getBytes());
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SUBJECT.getText(),
avroSchemaName.getBytes());
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_REGISTRY_SCHEMA_ID.getText(),
avroConverter.getSchemaId().toString().getBytes());
if (multiline) {
producerRecord.headers().add(MessageHeaders.AVRO_SCHEMA_MULTILINE_RECORD_NAME.getText(),
MULTILINE_RECORD_NAME.getBytes());
}
try {
Future<RecordMetadata> result = kafkaProducer.send(producerRecord);
RecordMetadata sendResult = result.get();
MessageLogger.logResourceBundleMessage(_messages, "JAPCTOAVROKAFKAPRODUCER:DEBUG0002",
sendResult.offset());
} catch (Exception e) {
MessageLogger.logError(e);
throw e;
}
代码运行良好,消息最终进入 Kafka 并经过处理最终进入 InfluxDB。问题是每次发送操作都会产生很多INFO消息(客户端ID号就是一个例子):
- [Producer clientId=producer-27902] 关闭 timeoutMillis = 10000 ms 的 Kafka 生产者。
- [Producer clientId=producer-27902] 关闭 Kafka producer timeoutMillis = 9223372036854775807 ms.
- Kafka startTimeMs: ...
- Kafka commitId:...
- [Producer clientId=producer-27902]集群ID:
哪个垃圾邮件我们的 Graylog。
我使用类似的代码发送 String 格式的消息。此代码在不产生 INFO 消息的情况下执行...
ProducerRecord<String, String> recordToSend = new ProducerRecord<>(queueName, messageText);
recordToSend.headers().add("messageID", messageID.getBytes());
Future<RecordMetadata> result = _producerConnection.send(recordToSend);
我知道 INFO 消息是从 class org.apache.kafka.clients.producer.KafkaProducer
记录的。我需要删除这些消息,但我无权访问 logging.mxl 定义 Graylog 的记录器属性。
有没有办法通过 POM 条目或以编程方式摆脱这些消息?
代码行为的原因是设计缺陷:
上面 post 中的代码被放置在一个方法中,该方法被调用以将消息发送到 Kafka。 KafkaProducer
class 在该方法中以及每次调用该方法时实例化。令人惊讶的是, KafkaProducer
不仅在调用代码的显式 close() 时发出 Closing the KafkaProducer with timeoutMillis =
,而且在对实例的强引用丢失时(在我的例子中,当代码离开方法时) ,在后一种情况下,timeoutMillis
设置为9223372036854775807(最大长数)。
为了摆脱许多消息,我将 KafkaProducer
实例化移出了方法,并使实例变量成为 class 属性,并且我不调用显式 close()
在 send(...)
之后。
此外,我将 class 实例化 KafkaProducer
的实例更改为强引用 class 成员。
通过这样做,我在实例化时收到 KafkaProducer
的一些消息,然后就没有声音了。