删除动态生成的目标中的嵌入 Header

Removing Embedded Header in Dynamically Generated Destinations

我想在使用动态生成的目标时禁用消息中的嵌入 header,类似于 here 中的示例(比如输出主题 = dyntopic1,dyntopic2,...)。

我已经设置了如下属性,但我仍然得到 header,如果我遗漏了什么,有什么建议吗?

spring.cloud.stream.bindings.output.group=test-ogroup
spring.cloud.stream.bindings.output.binder=kafka
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.output.content-type=text/plain

Kafka = v0.10

spring-cloud-dependencies.version = Edgware.M1

Spring Cloud Stream 1.3 或更低版本无法为动态目标设置生产者属性。

如果提前知道属性,可以在属性中设置...

spring.cloud.stream.bindings.dyntopic1.producer.headerMode=raw

该功能已 added to master 并将在 2.0 版本中提供。

编辑

只要不介意使用反射重置标志,Edgware 就可以做到。您必须替换通道解析器 bean。

我用 Edgware.SR1 对此进行了测试 - 你真的不应该再使用 M1,那是一个预发布里程碑。

我不能保证这适用于较新的版本,因为它会扰乱框架内部。

@SpringBootApplication
@EnableBinding(Sink.class)
public class So48543143Application {

    public static void main(String[] args) {
        SpringApplication.run(So48543143Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(MessageChannel routeChannel) {
        return args -> {
            routeChannel.send(new GenericMessage<>("foo"));
        };
    }

    @ServiceActivator(inputChannel = "routeChannel")
    @Bean
    public AbstractMappingMessageRouter router(MyBinderAwareChannelResolver resolver) {
        ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new LiteralExpression("foo"));
        router.setDefaultOutputChannelName("default");
        router.setChannelResolver(resolver);
        return router;
    }

    @Bean
    public MyBinderAwareChannelResolver binderAwareChannelResolver(BindingService bindingService,
            AbstractBindingTargetFactory<? extends MessageChannel> bindingTargetFactory,
            DynamicDestinationsBindable dynamicDestinationsBindable) {
        return new MyBinderAwareChannelResolver(bindingService, bindingTargetFactory, dynamicDestinationsBindable);
    }

    public static class MyBinderAwareChannelResolver extends BinderAwareChannelResolver {

        public MyBinderAwareChannelResolver(BindingService bindingService,
                AbstractBindingTargetFactory<? extends MessageChannel> bindingTargetFactory,
                DynamicDestinationsBindable dynamicDestinationsBindable) {
            super(bindingService, bindingTargetFactory, dynamicDestinationsBindable);
        }

        @Override
        public MessageChannel resolveDestination(String channelName) {
            MessageChannel channel = super.resolveDestination(channelName);
            DirectFieldAccessor dfa = new DirectFieldAccessor(channel);
            AbstractDispatcher dispatcher = (AbstractDispatcher) dfa.getPropertyValue("dispatcher");
            dfa = new DirectFieldAccessor(dispatcher);
            @SuppressWarnings("unchecked")
            Set<MessageHandler> handlers = (Set<MessageHandler>) dfa.getPropertyValue("handlers");
            // there should be exactly one handler
            MessageHandler handler = handlers.iterator().next();
            dfa = new DirectFieldAccessor(handler);
            dfa.setPropertyValue("embedHeaders", false);
            return channel;
        }

    }

}