如何配置 Spring Cloud StreamBridge 来生成 Avro?

How to Configure Spring Cloud StreamBridge to produce Avro?

所以我正在尝试使用 StreamBridge 将消息动态发送到不同的主题。如果我的输出是 Message,但不是 Message

,那么我这样做就成功了

代码示例:

@StreamListener(Sink.INPUT)
public void process(@Payload GenericRecord messageValue,
                    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) GenericRecord messageKey,
                    @Header("Type") String type) {
    log.info("Processing Event --> " + messageValue);

    // Code...

    // convert to Message<GenericRecord>
    Message<GenericRecord> message = ...

    streamBridge.send(type, message);

    log.info("Processed Event --> " + messageValue);
}

我得到的错误是 Caused by: org.springframework.messaging.converter.MessageConversionException: Could not write JSON: Not a map:,我猜这是因为 streamBridge acceptedOutputTypes = application/json

2020-06-28 04:42:55.670  INFO 54347 --- [container-0-C-1] o.s.c.f.c.c.SimpleFunctionRegistry       : Looking up function 'streamBridge' with acceptedOutputTypes: [application/json]

我尝试通过在我的属性中设置以下内容来将接受的输出类型修改为 avro,但这没有用。

spring.cloud.stream.function.definition=streamBridge
spring.kafka.producer.key-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings.streamBridge-out-0.content-type=application/*+avro
spring.cloud.stream.bindings.streamBridge-out-0.producer.use-native-encoding=true

关于如何将 StreamBridge 配置为 avro 的任何想法?

编辑:我也试过 streamBridge.send(type, message, MimeType.valueOf("application/*+avro")) 但也有转换错误。

您需要 useNativeEncoding 生产者 属性 才能使用自定义序列化程序。

我无法让 StreamBridge 动态工作,所以我改用函数:

@Bean
public Function<Message<GenericRecord>, Message<GenericRecord>> process() {
    return message -> {

        // Code...

        String topic = message.getHeaders().get("type");

        // convert to Message<GenericRecord>
        Message<GenericRecord> message = MessageBuilder...
            .setHeader("spring.cloud.stream.sendto.destination", topic)
            .build();
        

        return outgoingMessage;
    };
}

属性文件是:

spring.cloud.function.definition=process
spring.cloud.stream.bindings.process-in-0.destination=${consumer_topic}
spring.cloud.stream.bindings.process-in-0.group=${spring.application.name}

spring.cloud.stream.bindings.process-out-0.content-type=application/*+avro
spring.cloud.stream.bindings.process-out-0.producer.use-native-encoding=true

编辑:Streambridge 已修复以支持此功能:https://github.com/spring-cloud/spring-cloud-stream/issues/2007