ReplyingKafkaTemplate - sendAndReceive序列化问题

ReplyingKafkaTemplate - sendAndReceive serialisation issue

我正在尝试使用 ReplyingKafkaTemplatesendAndReceive message/response。

在执行的服务中,我使用 ProducerFactoryConcurrentKafkaListenerContainerFactory 的默认实例来创建 ReplyingKafkaTemplate 的新实例,如下所示:

@Service
public class MyService {

  private final ReplyingKafkaTemplate<String, MyCommand, MyResult> kafkaTemplate;

  public MyService(ProducerFactory<String, MyCommand> pf,
                    ConcurrentKafkaListenerContainerFactory<String, MyResult> containerFactory) {
    var container = containerFactory.createContainer("results topic name");
    this.kafkaTemplate = new ReplyingKafkaTemplate<>(pf, container);
    kafkaTemplate.start();
  }

   //... other methods omitted as irrelevant...

  private Mono<MyResult> sendMessageAndWaitForResult(ProducerRecord<String, MyCommand> producerRecord) {
    var requestReplyFuture = kafkaTemplate.sendAndReceive(producerRecord);
    return Mono
      .fromFuture(requestReplyFuture.completable())
      .map(ConsumerRecord::value);
  }

  private ProducerRecord<String, MyCommand> createProducerRecord(Iterable<RecordHeader> recordHeaders, MyCommand myCommand) {
        return new ProducerRecord(
                "commands topic name",
                null,
                "some string id",
                myCommand,
                recordHeaders);
    }
}

MyCommandMyResult 是在 Lombok 的帮助下定义的 POJO。

我得到的错误是:

KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class MyCommand to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer.

我知道我没有定义序列化程序,但通过 Kafka 中的示例,他们只使用 ReplyingKafkaTemplate<String, String, String> 示例,当然他们不需要序列化程序设置。但是我做了但找不到在哪里以及如何做?

谢谢!

编辑 1:多亏了 Garry Russell 的提示,我才开始为反序列化器创建一个 Bean,但不知何故,在遇到一些错误后,我设法让它以这种方式工作:

public DefaultKafkaProducerFactory<String, MyCommand> producerFactory(
  KafkaProperties properties,
  AvroConverter avroConverter) {
        Map<String, Object> props = properties
  .buildProducerProperties();
        DefaultKafkaProducerFactory pf = 
  new DefaultKafkaProducerFactory(
    props,                                                                 
    new StringSerializer(),
    avroConverter.valueSerde().serializer());
        return pf;
}

上面应该是一个@Bean但是当我这样做的时候,我得到了下面评论中提到的错误,一些bean的依赖关系形成了一个周期。

所以这个producerFactory我在创建的时候就通过了ReplyingKafkaTemplate

令人费解的是,我不必创建相同的东西来反序列化 MyResponse?

AvroConverter 是 class 包装 SpecificAvroSerde.

的创建

我假设您正在使用 Spring Boot;它默认配置 StringSerializer

KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.SerializationException: Can't convert value of class MyCommand to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer.

由于您发送的是 POJO,而不是字符串,因此您必须使用与 StringSerializer 不同的 Serializer,例如 JsonSerializer.

Serialization, Deserialization, and Message Conversion

使用 Spring 引导,您可以将序列化程序指定为 属性。

https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html#application-properties.integration.spring.kafka.producer.value-serializer

spring:
  kafka:
    producer:
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

另请参阅示例

https://github.com/spring-projects/spring-kafka/tree/main/samples