Spring Kafka - 遇到 "Magic v0 does not support record headers" 错误
Spring Kafka - Encountering "Magic v0 does not support record headers" error
我是 运行 一个 Spring 引导应用程序,并且已经进入 compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')
我正在尝试使用此版本反对 Cloudera 安装:
Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50
我的 KafkaProducerConfig class 非常简单:
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;
@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;
@Value("${spring.kafka.producer.client-id}")
private String producerClientId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
@Bean
public ProducerFactory<String, Pdid> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(this.defaultTopicName);
return kafkaTemplate;
}
@PostConstruct
public void postConstruct() {
LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}
}
当我启动应用程序时,我收到:
2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
然后,我发送一个有效负载。它显示在 Kafka 工具中针对该主题。但是,在尝试摄取数据时,在 Kafka 端的日志中,我收到:
[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$$anonfun$apply.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$$anonfun$apply.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)
我已经从 Producer 应用程序端尝试了以下操作:
- 正在降级到 Spring Kafka 2.0.4。我希望降到 0.11.0 的 Kafka 版本可以帮助解决这个问题,但是没有效果。
- 验证节点都是相同的版本。据我的管理员说,他们是。
- 已与我的管理员确认我们没有混合安装。再一次,我被告知我们没有。
- 基于类似的 Stack Overflow 问题,我回到 2.1.5 并尝试将 JsonSerializer.ADD_TYPE_INFO_HEADERS 设为 false。我想也许它会删除日志所指的 headers 。再次失败,记录了同样的错误。
我希望我遗漏了一些明显的东西。是否还有其他 headers 我需要转 on/off 来帮助解决任何人都知道的 Magic v0 问题?
我们有其他应用程序可以成功写入同一环境中的其他主题,但它们是较旧的应用程序,它们是 hand-crafting 必要的 Spring bean。此外,这些应用程序还使用更旧的客户端 (0.8.2.2),并且它们使用 StringSerializer 作为 Producer 值而不是 JSON。我需要我的数据在 JSON 中,当我们在应该支持 0.11.x.
的系统上时,我真的不想降级到 0.8.2.2
but they are older applications that are hand-crafting the necessary Spring beans.
at org.apache.kafka.common.record.FileRecords. downConvert (FileRecords.java:245)
我不熟悉 kafka broker 的内部结构,但它“听起来”像是主题是用一个旧的 broker 创建的,它们的格式不支持 headers,而不是 broker 版本本身(提示: downConvert).
你用干净的经纪人试过这个吗?
1.0.x 客户端可以与较旧的经纪人(回到 0.10.2.x IIRC)交谈,只要您不尝试使用经纪人不支持的功能。您的代理是 0.11(确实支持 headers)这一事实进一步表明问题出在主题记录格式上。
我已经成功测试了 up/down broker/client/topic 个版本,只要您使用通用功能子集即可。
JsonSerializer.ADD_TYPE_INFO_HEADERS to false.
这应该可以防止框架设置任何 headers;您需要显示您的生产者代码(以及所有配置)。
您还可以在生产者配置中添加一个 ProducerInterceptor
并检查 onSend()
方法中的 ProducerRecord
headers
属性,以找出是什么在输出消息中设置 headers。
如果您正在使用 spring-messaging 消息(template.setn(Message<?> m)
、headers 将默认映射)。使用原始 template.send()
方法不会设置任何 headers(除非您发送 ProducerRecord
并填充 headers。
问题的解决方案是两件事的结合:
- 我需要按照 Gary Russell 的建议添加
JsonSerializer.ADD_TYPE_INFO_HEADERS to false
。
- 在将配置放入我的应用程序之前,我需要刷新已放入主题的所有记录。之前的记录有 headers,它正在破坏 Flume 消费者。
我已经将我的应用程序升级到 Spring boot 2x,但我遇到了一些与 kafka 客户端依赖项的兼容性问题(参见 Spring-boot and Spring-Kafka compatibility matrix),所以我也必须升级它。另一方面,我在服务器上有一个较旧的代理(kafka 0.10)运行,然后我无法向它发送消息。我还意识到,即使将 JsonSerializer.ADD_TYPE_INFO_HEADERS
设置为 false,kafka 生产者也在内部设置 headers,并且由于魔术是根据 kafka 的版本(在 RecordBatch
中)固定的,所以有在这种情况下没有办法不陷入 MemoryRecordsBuilder.appendWithOffset
的条件:if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
。
最后,我解决这个问题的唯一方法是升级我的 kafka 服务器。
我是 运行 一个 Spring 引导应用程序,并且已经进入 compile('org.springframework.kafka:spring-kafka:2.1.5.RELEASE')
我正在尝试使用此版本反对 Cloudera 安装:
Cloudera Distribution of Apache Kafka Version 3.0.0-1.3.0.0.p0.40 Version 0.11.0+kafka3.0.0+50
我的 KafkaProducerConfig class 非常简单:
@Configuration
public class KafkaProducerConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerConfig.class);
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.template.default-topic}")
private String defaultTopicName;
@Value("${spring.kafka.producer.compression-type}")
private String producerCompressionType;
@Value("${spring.kafka.producer.client-id}")
private String producerClientId;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, this.producerCompressionType);
props.put(ProducerConfig.CLIENT_ID_CONFIG, this.producerClientId);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
return props;
}
@Bean
public ProducerFactory<String, Pdid> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Pdid> kafkaTemplate() {
KafkaTemplate<String, Pdid> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(this.defaultTopicName);
return kafkaTemplate;
}
@PostConstruct
public void postConstruct() {
LOGGER.info("Kafka producer configuration: " + this.producerConfigs().toString());
LOGGER.info("Kafka topic name: " + this.defaultTopicName);
}
}
当我启动应用程序时,我收到:
2018-05-01 17:15:41.355 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.1
2018-05-01 17:15:41.356 INFO 54674 --- [nio-9000-exec-2] o.a.kafka.common.utils.AppInfoParser : Kafka commitId : c0518aa65f25317e
然后,我发送一个有效负载。它显示在 Kafka 工具中针对该主题。但是,在尝试摄取数据时,在 Kafka 端的日志中,我收到:
[KafkaApi-131] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,topics=[{topic=profiles-pdid,partitions=[{partition=0,fetch_offset=7,max_bytes=1048576}]}]}java.lang.IllegalArgumentException: Magic v0 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$$anonfun$apply.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$$anonfun$apply.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$createResponse(KafkaApis.scala:558)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$fetchResponseCallback.apply$mcVI$sp(KafkaApis.scala:579)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.KafkaApis.sendResponseMaybeThrottle(KafkaApis.scala:2014)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$fetchResponseCallback(KafkaApis.scala:578)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$processResponseCallback.apply$mcVI$sp(KafkaApis.scala:598)
at kafka.server.ClientQuotaManager.recordAndThrottleOnQuotaViolation(ClientQuotaManager.scala:196)
at kafka.server.ClientQuotaManager.recordAndMaybeThrottle(ClientQuotaManager.scala:188)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$processResponseCallback(KafkaApis.scala:597)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest.apply(KafkaApis.scala:614)
at kafka.server.KafkaApis$$anonfun$handleFetchRequest.apply(KafkaApis.scala:614)
at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:639)
at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:606)
at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
at java.lang.Thread.run(Thread.java:748)
我已经从 Producer 应用程序端尝试了以下操作:
- 正在降级到 Spring Kafka 2.0.4。我希望降到 0.11.0 的 Kafka 版本可以帮助解决这个问题,但是没有效果。
- 验证节点都是相同的版本。据我的管理员说,他们是。
- 已与我的管理员确认我们没有混合安装。再一次,我被告知我们没有。
- 基于类似的 Stack Overflow 问题,我回到 2.1.5 并尝试将 JsonSerializer.ADD_TYPE_INFO_HEADERS 设为 false。我想也许它会删除日志所指的 headers 。再次失败,记录了同样的错误。
我希望我遗漏了一些明显的东西。是否还有其他 headers 我需要转 on/off 来帮助解决任何人都知道的 Magic v0 问题?
我们有其他应用程序可以成功写入同一环境中的其他主题,但它们是较旧的应用程序,它们是 hand-crafting 必要的 Spring bean。此外,这些应用程序还使用更旧的客户端 (0.8.2.2),并且它们使用 StringSerializer 作为 Producer 值而不是 JSON。我需要我的数据在 JSON 中,当我们在应该支持 0.11.x.
的系统上时,我真的不想降级到 0.8.2.2but they are older applications that are hand-crafting the necessary Spring beans.
at org.apache.kafka.common.record.FileRecords. downConvert (FileRecords.java:245)
我不熟悉 kafka broker 的内部结构,但它“听起来”像是主题是用一个旧的 broker 创建的,它们的格式不支持 headers,而不是 broker 版本本身(提示: downConvert).
你用干净的经纪人试过这个吗?
1.0.x 客户端可以与较旧的经纪人(回到 0.10.2.x IIRC)交谈,只要您不尝试使用经纪人不支持的功能。您的代理是 0.11(确实支持 headers)这一事实进一步表明问题出在主题记录格式上。
我已经成功测试了 up/down broker/client/topic 个版本,只要您使用通用功能子集即可。
JsonSerializer.ADD_TYPE_INFO_HEADERS to false.
这应该可以防止框架设置任何 headers;您需要显示您的生产者代码(以及所有配置)。
您还可以在生产者配置中添加一个 ProducerInterceptor
并检查 onSend()
方法中的 ProducerRecord
headers
属性,以找出是什么在输出消息中设置 headers。
如果您正在使用 spring-messaging 消息(template.setn(Message<?> m)
、headers 将默认映射)。使用原始 template.send()
方法不会设置任何 headers(除非您发送 ProducerRecord
并填充 headers。
问题的解决方案是两件事的结合:
- 我需要按照 Gary Russell 的建议添加
JsonSerializer.ADD_TYPE_INFO_HEADERS to false
。 - 在将配置放入我的应用程序之前,我需要刷新已放入主题的所有记录。之前的记录有 headers,它正在破坏 Flume 消费者。
我已经将我的应用程序升级到 Spring boot 2x,但我遇到了一些与 kafka 客户端依赖项的兼容性问题(参见 Spring-boot and Spring-Kafka compatibility matrix),所以我也必须升级它。另一方面,我在服务器上有一个较旧的代理(kafka 0.10)运行,然后我无法向它发送消息。我还意识到,即使将 JsonSerializer.ADD_TYPE_INFO_HEADERS
设置为 false,kafka 生产者也在内部设置 headers,并且由于魔术是根据 kafka 的版本(在 RecordBatch
中)固定的,所以有在这种情况下没有办法不陷入 MemoryRecordsBuilder.appendWithOffset
的条件:if (magic < RecordBatch.MAGIC_VALUE_V2 && headers != null && headers.length > 0) throw new IllegalArgumentException("Magic v" + magic + " does not support record headers");
。
最后,我解决这个问题的唯一方法是升级我的 kafka 服务器。