spring cloud dataflow kafka binder 发送时出错 - Magic v1 不支持记录头

spring cloud dataflow kafka binder error on sending - Magic v1 does not support record headers

尝试让 spring 云数据流在 Kubernetes 中工作,当 Source 尝试向 kafka 发送消息时出现以下错误。当我使用 Rabbit MQ.Kafka 服务器版本 Kafka 2.1.0 时,相同的代码工作正常。我在其他帖子中读到,这可能是由于 kafka 版本与与较低 kafka 版本捆绑的客户端不兼容。想知道我怎样才能让它工作。

用于向 KAFKA 频道发送消息的代码

   public  void sendToChannelNew(List<String> list,String name)  {        
        MessageBuilder builder=MessageBuilder.withPayload(list.toString());
        builder=builder.setHeader("SOURCE",name);
        source.output().send(builder.build());
    }

POM

 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>2.1.4.RELEASE</version>
    </dependency>

堆栈跟踪

KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5a4a522a]; nested exception is java.lang.IllegalArgumentException: Magic v1 does not support record headers, failedMessage=GenericMessage [payload=byte[271962], headers={SOURCE=CLIENT
entType=application/json, timestamp=1574199924374}]' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5
ArgumentException: Magic v1 does not support record headers, failedMessage=GenericMessage [payload=byte[271962], headers={SOURCE=ACCOUNT_CLIENT, id=2dcee73c-9da0-72fe-d5f7-07a2dba5b099, contentType=application/json, timestamp=1574199924374}]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5a4a522a]; nested exception is java.lang.Il
rt record headers
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1077)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.jpmorgan.awm.wm.edpipoc.EdpiSource.sendToChannelNew(EdpiSource.java:73)
        at com.jpmorgan.awm.wm.edpipoc.EdpiSource.lambda$null(EdpiSource.java:134)
        at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)
        at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)
        at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:225)
        at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:47)
        at reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:3694)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
        at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:183)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:248)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144)
        at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:262)
        at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)
        at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onComplete(FluxBufferPredicate.java:285)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:325)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:638)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:259)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
        at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:334)
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:381)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1436)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1203)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1247)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
        at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:905)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:410)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:449)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:506)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:529)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:107)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:223)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:864)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)

问题是使用的 kafka broker 是旧的 (0.10.0) 不支持 headers。切换到更高版本并解决了这个问题