如何以及在何处使用 spring-integration 为 jms 或 activemq 调用 HeaderValueRouter 和 HeaderEnricher

How and where to invoke HeaderValueRouter and HeaderEnricher for jms or activemq with spring-integration

在我的项目中,我们使用spring-integration来实现队列。 Header-Value-Router 的配置在 xml 中,如下所示:

<header-value-router input-channel="jmsUVMasterMessageChannel"
    header-name="METADATA_TYPE" default-output-channel="nullChannel"
    resolution-required="false">
    <mapping value="ASSEMBLY" channel="downloadImageToS3Channel" />
    <mapping value="THIRDPARTY" channel="invokeUVMasterAPIChannel" />
</header-value-router>

<header-enricher input-channel="JuiceVendorMessageChannel"
    output-channel="JuiceHeaderEnricherChannel">
    <header name="sourceName" value="JUICE" />
</header-enricher>

这是网关配置:

我想知道如何在 java 代码中实现这个配置?我搜索了 google 并找到了以下代码:

@Bean
public HeaderValueRouter headerRouter(String gatewayPrefix) {
  HeaderValueRouter router = new HeaderValueRouter("METADATA_TYPE");
  router.setChannelMapping("ASSEMBLY", "downloadImageToS3Channel");
  router.setChannelMapping("THIRDPARTY", "invokeUVMasterAPIChannel");
  router.setDefaultOutputChannel(new NullChannel());
  router.setResolutionRequired(false);
  return router;
}

@Bean
@Transformer(inputChannel="sdiVenderMessageChannel", outputChannel="sdiHeaderEnricherChannel")
public HeaderEnricher enrichHeaders() {
    Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<String, HeaderValueMessageProcessor<?>>();
    // TODO 
    //headersToAdd.put("sourceName",new StaticHeaderValueMessageProcessor<String>("SDI"));
    headersToAdd.put("sourceName", null);
    HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
    if(logger.isDebugEnabled()){
        logger.debug("HeaderEnricher bean initial!");
    }
    return enricher;
}

但这是我的问题:

1) 如何在 java 配置中配置 'input-channel'?

2) 我应该在哪里调用这个函数?当我启动应用程序时,代码不会 运行 进入此功能。我知道 headerValueRouter 用于将消息从输入通道调度到输出通道,但我可以调用此路由器的确切位置是什么?在 ConnectionFactory 或 ListenerContainer 中?

这是我定义的网关:

public void registerDynamicInboundGateway(DefaultListableBeanFactory beanFactory) {
    this.beanFactory = beanFactory;

    BeanDefinitionBuilder builder = BeanDefinitionBuilder
            .genericBeanDefinition("org.springframework.integration.jms.JmsMessageDrivenEndpoint");
    String jndiFactoryName = this.registerJndiObjectFactoryBean(gatewayPrefix, jmsQueueValue);
    this.destination = jndiFactoryName;
    String containerBeanName = this.registerMessageListenerContainer();
    String listenerBeanName = this.registerMessageListener();
    builder.addConstructorArgReference(containerBeanName);
    builder.addConstructorArgReference(listenerBeanName);
    setValueIfAttributeDefined(builder, autoStartup, "auto-startup");

    String beanName = null;
    if(gatewayPrefix.equals(HCC_GATEWAY_PREFIX)){
        beanName = gatewayPrefix + DynamicInboundGateway.HCC_GATEWAY_SUFIX;
    } else if (gatewayPrefix.equals(UVMASTER_GATEWAY_PREFIX)) {
        beanName = gatewayPrefix + DynamicInboundGateway.UVMASTER_GATEWAY_PREFIX;
    } else {
        beanName = gatewayPrefix + DynamicInboundGateway.GATEWAY_SUFIX;
    }

    this.beanFactory.registerBeanDefinition(
            beanName, builder.getBeanDefinition());
}

我感到很困惑,没有找到任何示例代码。 非常感谢!

当我根据spring集成网站中的示例代码定义HeaderEnricher时,如下所示:

@Bean
@Transformer(inputChannel="sdiVenderMessageChannel", outputChannel="sdiHeaderEnricherChannel")
public HeaderEnricher enrichHeaders() {
    Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<String, HeaderValueMessageProcessor<?>>();
    // TODO 
    headersToAdd.put("sourceName",new StaticHeaderValueMessageProcessor<String>("SDI"));
    //headersToAdd.put("sourceName", null);
    HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
    if(logger.isDebugEnabled()){
        logger.debug("HeaderEnricher bean initial!");
    }
    return enricher;
}

这发生了一些错误:

StaticHeaderValueMessageProcessor cannot be resolved to a type

但是我已经用 maven 导入了所有需要的包。我不明白为什么。

希望这个简单的 Spring 启动应用程序能让事情变得清晰...

@SpringBootApplication
public class So40585409Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So40585409Application.class, args);
        JmsTemplate template = context.getBean(JmsTemplate.class);
        template.convertAndSend("foo", "message1", m -> {
            m.setStringProperty("foo", "bar");
            return m;
        });
        template.convertAndSend("foo", "message2", m -> {
            m.setStringProperty("foo", "baz");
            return m;
        });
        Thread.sleep(5000);
        context.close();
    }

    @Bean
    public JmsMessageDrivenEndpoint inbound(ConnectionFactory jmsConnectionFactory) {
        return new JmsMessageDrivenEndpoint(container(jmsConnectionFactory), listener());
    }

    @Bean
    public DefaultMessageListenerContainer container(ConnectionFactory jmsConnectionFactory) {
        DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
        container.setConnectionFactory(jmsConnectionFactory);
        container.setDestinationName("foo");
        return container;
    }

    @Bean
    public ChannelPublishingJmsMessageListener listener() {
        ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
        listener.setRequestChannelName("toRouter");
        return listener;
    }

    @Bean
    @Router(inputChannel="toRouter")
    public HeaderValueRouter router() {
        HeaderValueRouter router = new HeaderValueRouter("foo");
        router.setChannelMapping("bar", "barChannel");
        router.setChannelMapping("baz", "bazChannel");
        return router;
    }

    @ServiceActivator(inputChannel = "barChannel")
    public void bar(String in) {
        System.out.println("Received via barChannel: " + in);
    }

    @ServiceActivator(inputChannel = "bazChannel")
    public void baz(String in) {
        System.out.println("Received via bazChannel: " + in);
    }

}

结果:

2016-11-14 09:17:56.798  INFO 65082 --- [           main] com.example.So40585409Application        : Started So40585409Application in 8.842 seconds (JVM running for 10.819)
Received via barChannel: message1
Received via bazChannel: message2
2016-11-14 09:18:02.038  INFO 65082 --- [           main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2b546384: startup date [Mon Nov 14 09:17:49 EST 2016]; root of context hierarchy

编辑 - Header Enricher

我刚刚添加了你的 bean,没有任何问题...

    @ServiceActivator(inputChannel = "barChannel2")
    public void bar(String in, @Header("sourceName") String sourceName) {
        System.out.println("Received via barChannel: " + in + " " + sourceName);
    }

    @ServiceActivator(inputChannel = "bazChannel")
    public void baz(String in) {
        System.out.println("Received via bazChannel: " + in);
    }

    @Bean
    @Transformer(inputChannel = "barChannel", outputChannel = "barChannel2")
    public HeaderEnricher enrichHeaders() {
        Map<String, HeaderValueMessageProcessor<?>> headersToAdd = new HashMap<String, HeaderValueMessageProcessor<?>>();
        headersToAdd.put("sourceName", new StaticHeaderValueMessageProcessor<String>("SDI"));
        HeaderEnricher enricher = new HeaderEnricher(headersToAdd);
        return enricher;
    }

结果:

2016-11-15 09:00:48.383  INFO 50137 --- [           main] com.example.So40585409Application        : Started So40585409Application in 1.814 seconds (JVM running for 2.528)
Received via barChannel: message1 SDI
Received via bazChannel: message2
2016-11-15 09:00:53.424  INFO 50137 --- [           main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2c34f934: startup date [Tue Nov 15 09:00:46 EST 2016]; root of context hierarchy