删除动态生成的目标中的嵌入 Header
Removing Embedded Header in Dynamically Generated Destinations
我想在使用动态生成的目标时禁用消息中的嵌入 header,类似于 here 中的示例(比如输出主题 = dyntopic1,dyntopic2,...)。
我已经设置了如下属性,但我仍然得到 header,如果我遗漏了什么,有什么建议吗?
spring.cloud.stream.bindings.output.group=test-ogroup
spring.cloud.stream.bindings.output.binder=kafka
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.output.content-type=text/plain
Kafka = v0.10
spring-cloud-dependencies.version = Edgware.M1
Spring Cloud Stream 1.3 或更低版本无法为动态目标设置生产者属性。
如果提前知道属性,可以在属性中设置...
spring.cloud.stream.bindings.dyntopic1.producer.headerMode=raw
该功能已 added to master 并将在 2.0 版本中提供。
编辑
只要不介意使用反射重置标志,Edgware 就可以做到。您必须替换通道解析器 bean。
我用 Edgware.SR1 对此进行了测试 - 你真的不应该再使用 M1,那是一个预发布里程碑。
我不能保证这适用于较新的版本,因为它会扰乱框架内部。
@SpringBootApplication
@EnableBinding(Sink.class)
public class So48543143Application {
public static void main(String[] args) {
SpringApplication.run(So48543143Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel routeChannel) {
return args -> {
routeChannel.send(new GenericMessage<>("foo"));
};
}
@ServiceActivator(inputChannel = "routeChannel")
@Bean
public AbstractMappingMessageRouter router(MyBinderAwareChannelResolver resolver) {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new LiteralExpression("foo"));
router.setDefaultOutputChannelName("default");
router.setChannelResolver(resolver);
return router;
}
@Bean
public MyBinderAwareChannelResolver binderAwareChannelResolver(BindingService bindingService,
AbstractBindingTargetFactory<? extends MessageChannel> bindingTargetFactory,
DynamicDestinationsBindable dynamicDestinationsBindable) {
return new MyBinderAwareChannelResolver(bindingService, bindingTargetFactory, dynamicDestinationsBindable);
}
public static class MyBinderAwareChannelResolver extends BinderAwareChannelResolver {
public MyBinderAwareChannelResolver(BindingService bindingService,
AbstractBindingTargetFactory<? extends MessageChannel> bindingTargetFactory,
DynamicDestinationsBindable dynamicDestinationsBindable) {
super(bindingService, bindingTargetFactory, dynamicDestinationsBindable);
}
@Override
public MessageChannel resolveDestination(String channelName) {
MessageChannel channel = super.resolveDestination(channelName);
DirectFieldAccessor dfa = new DirectFieldAccessor(channel);
AbstractDispatcher dispatcher = (AbstractDispatcher) dfa.getPropertyValue("dispatcher");
dfa = new DirectFieldAccessor(dispatcher);
@SuppressWarnings("unchecked")
Set<MessageHandler> handlers = (Set<MessageHandler>) dfa.getPropertyValue("handlers");
// there should be exactly one handler
MessageHandler handler = handlers.iterator().next();
dfa = new DirectFieldAccessor(handler);
dfa.setPropertyValue("embedHeaders", false);
return channel;
}
}
}
我想在使用动态生成的目标时禁用消息中的嵌入 header,类似于 here 中的示例(比如输出主题 = dyntopic1,dyntopic2,...)。
我已经设置了如下属性,但我仍然得到 header,如果我遗漏了什么,有什么建议吗?
spring.cloud.stream.bindings.output.group=test-ogroup
spring.cloud.stream.bindings.output.binder=kafka
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.output.content-type=text/plain
Kafka = v0.10
spring-cloud-dependencies.version = Edgware.M1
Spring Cloud Stream 1.3 或更低版本无法为动态目标设置生产者属性。
如果提前知道属性,可以在属性中设置...
spring.cloud.stream.bindings.dyntopic1.producer.headerMode=raw
该功能已 added to master 并将在 2.0 版本中提供。
编辑
只要不介意使用反射重置标志,Edgware 就可以做到。您必须替换通道解析器 bean。
我用 Edgware.SR1 对此进行了测试 - 你真的不应该再使用 M1,那是一个预发布里程碑。
我不能保证这适用于较新的版本,因为它会扰乱框架内部。
@SpringBootApplication
@EnableBinding(Sink.class)
public class So48543143Application {
public static void main(String[] args) {
SpringApplication.run(So48543143Application.class, args);
}
@Bean
public ApplicationRunner runner(MessageChannel routeChannel) {
return args -> {
routeChannel.send(new GenericMessage<>("foo"));
};
}
@ServiceActivator(inputChannel = "routeChannel")
@Bean
public AbstractMappingMessageRouter router(MyBinderAwareChannelResolver resolver) {
ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new LiteralExpression("foo"));
router.setDefaultOutputChannelName("default");
router.setChannelResolver(resolver);
return router;
}
@Bean
public MyBinderAwareChannelResolver binderAwareChannelResolver(BindingService bindingService,
AbstractBindingTargetFactory<? extends MessageChannel> bindingTargetFactory,
DynamicDestinationsBindable dynamicDestinationsBindable) {
return new MyBinderAwareChannelResolver(bindingService, bindingTargetFactory, dynamicDestinationsBindable);
}
public static class MyBinderAwareChannelResolver extends BinderAwareChannelResolver {
public MyBinderAwareChannelResolver(BindingService bindingService,
AbstractBindingTargetFactory<? extends MessageChannel> bindingTargetFactory,
DynamicDestinationsBindable dynamicDestinationsBindable) {
super(bindingService, bindingTargetFactory, dynamicDestinationsBindable);
}
@Override
public MessageChannel resolveDestination(String channelName) {
MessageChannel channel = super.resolveDestination(channelName);
DirectFieldAccessor dfa = new DirectFieldAccessor(channel);
AbstractDispatcher dispatcher = (AbstractDispatcher) dfa.getPropertyValue("dispatcher");
dfa = new DirectFieldAccessor(dispatcher);
@SuppressWarnings("unchecked")
Set<MessageHandler> handlers = (Set<MessageHandler>) dfa.getPropertyValue("handlers");
// there should be exactly one handler
MessageHandler handler = handlers.iterator().next();
dfa = new DirectFieldAccessor(handler);
dfa.setPropertyValue("embedHeaders", false);
return channel;
}
}
}