Spring-集成在 Java 配置中

Spring-Integration in Java Config

我必须将使用 Spring 集成在 xml 中的项目转换为 Java 配置,并在下面进行了尝试,但出现异常。

我想要实现的是,创建一个消息侦听器,它侦听队列目标上的消息,将其放入一个通道,该通道由放入下一个通道的 xsl 转换器读取和处理,即再次由 JaxbUnmarshaller 读取和处理以创建 Java 对象并将其放入第三个通道,该通道将由 Java 消费者使用。如果出现错误,消息将被放入错误通道,该通道又是处理它的 Java 消费者。

下面是Spring配置

  <int:channel id="channel_1"/>
  <int:channel id="channel_2"/>
  <int:channel id="channel_3"/>
  <int:channel id="channel_error"/>

  <int-jms:message-driven-channel-adapter connection-factory="factory" destination-name="destName" 
       pub-sub-domain="false" channel="channel_1" error-channel="channel_error"/>

  <int-xml:xslt-transformer input-channel="channel_1" output-channel="channel_2" xsl-resource="classpath:sample.xsl"/>

  <int-xml:unmarshalling-transformer input-channel="channel_2" output-channel="channel_3" unmarshaller="unmarshaller"/>

  <oxm:jaxb2-marshaller id="unmarshaller">
    <oxm:class-to-be-bound name="sample.example.MyData"/>
  </oxm:jaxb2-marshaller>

  <int:outbound-channel-adapter channel="channel_3" ref="consumer" method="handleMessage"/>

  <bean id="consumer" class="sample.example.MyConsumer"></bean>

  <int:outbound-channel-adapter channel="channel_error" ref="consumer" method="handleError"/>

下面是我的 Java 配置

  @Bean
  public MessageChannel channel_1() {
    return new DirectChannel();
  }
  @Bean
  public MessageChannel channel_2() {
    return new DirectChannel();
  }
  @Bean
  public MessageChannel channel_3() {
    return new DirectChannel();
  }
  @Bean
  public MessageChannel channel_error() {
    return new DirectChannel();
  }
  @Bean
  @Autowired
  public DefaultMessageListenerContainer container(String destinationQueueName) {
    DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
    container.setConnectionFactory(factory());
    container.setDestinationName(destinationQueueName);
    container.setPubSubDomain(false);
    container.setMessageListener(messageListener());
    return container;
  }

  @Bean
  public ChannelPublishingJmsMessageListener messageListener(){
    ChannelPublishingJmsMessageListener adapter = new ChannelPublishingJmsMessageListener();
    adapter.setRequestChannel(channel_1());
    adapter.setErrorChannel(channel_error());
    return adapter;
  }

  @Bean
  @Transformer(inputChannel = "channel_1", outputChannel = "channel_2")
  public XsltPayloadTransformer transformer() {
    return new XsltPayloadTransformer(new ClassPathResource("sample.xsl"));
  }

  @Bean
  @Transformer(inputChannel = "channel_2", outputChannel = "channel_3")
  public UnmarshallingTransformer unmarshallingTransformer() {
    return new UnmarshallingTransformer(unmarshaller());
  }

  @Bean
  public Jaxb2Marshaller unmarshaller() {
    Jaxb2Marshaller unmarshaller = new Jaxb2Marshaller();
    unmarshaller.setClassesToBeBound(MyData.class);
    return unmarshaller;
  }

  @Bean
  @ServiceActivator(inputChannel = "channel_3")
  public MessageHandler messageHandler() {
    return new MethodInvokingMessageHandler(consumer(), "handleMessage");
  }

  @Bean
  @ServiceActivator(inputChannel = "channel_error")
  public MessageHandler errorHandler() {
    return new MethodInvokingMessageHandler(consumer(), "handleError");
  }

我得到以下异常,当我提到这个异常时,它说服务激活器配置不正确。我不确定我缺少什么,有人可以帮忙吗。

  org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'org.springframework.web.context.WebApplicationContext:/v1.channel_error'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:281)
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:479)
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:322)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:674)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:634)
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:605)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033)
    at java.lang.Thread.run(Thread.java:745)
  Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:107)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    ... 16 more

启动时登录:

2016-03-30 13:06:46,423 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [servletConfigInitParams] PropertySource with lowest search precedence
2016-03-30 13:06:46,403 DEBUG com.amazonaws.jmx.MBeans.registerMBean[58] - Failed to register mbean com.amazonaws.management:type=AwsSdkMetrics
javax.management.InstanceAlreadyExistsException: com.amazonaws.management:type=AwsSdkMetrics
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
    at com.amazonaws.jmx.MBeans.registerMBean(MBeans.java:52)
    at com.amazonaws.jmx.SdkMBeanRegistrySupport.registerMetricAdminMBean(SdkMBeanRegistrySupport.java:27)
    at com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean(AwsSdkMetrics.java:355)
    at com.amazonaws.metrics.AwsSdkMetrics.<clinit>(AwsSdkMetrics.java:316)
    at com.amazonaws.AmazonWebServiceClient.requestMetricCollector(AmazonWebServiceClient.java:629)
    at com.amazonaws.AmazonWebServiceClient.isRMCEnabledAtClientOrSdkLevel(AmazonWebServiceClient.java:570)
    at com.amazonaws.AmazonWebServiceClient.isRequestMetricsEnabled(AmazonWebServiceClient.java:562)
    at com.amazonaws.AmazonWebServiceClient.createExecutionContext(AmazonWebServiceClient.java:523)
    at com.amazonaws.services.sqs.AmazonSQSClient.getQueueUrl(AmazonSQSClient.java:525)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.getQueueUrl(AmazonSQSMessagingClientWrapper.java:260)
    at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.getQueueUrl(AmazonSQSMessagingClientWrapper.java:230)
    at com.amazon.sqs.javamessaging.SQSSession.createQueue(SQSSession.java:576)
    at org.springframework.jms.support.destination.DynamicDestinationResolver.resolveQueue(DynamicDestinationResolver.java:84)
    at org.springframework.jms.support.destination.DynamicDestinationResolver.resolveDestinationName(DynamicDestinationResolver.java:58)
    at org.springframework.jms.support.destination.JmsDestinationAccessor.resolveDestinationName(JmsDestinationAccessor.java:98)
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.createListenerConsumer(AbstractPollingMessageListenerContainer.java:204)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.initResourcesIfNecessary(DefaultMessageListenerContainer.java:1167)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1143)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136)
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033)
    at java.lang.Thread.run(Thread.java:745)
2016-03-30 13:06:46,423 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [servletConfigInitParams] PropertySource with lowest search precedence
2016-03-30 13:06:46,427 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [servletContextInitParams] PropertySource with lowest search precedence
2016-03-30 13:06:46,427 DEBUG com.amazonaws.metrics.AwsSdkMetrics.registerMetricAdminMBean[369] - Admin mbean registered under com.amazonaws.management:type=AwsSdkMetrics/4
2016-03-30 13:06:46,427 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [servletContextInitParams] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [jndiProperties] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [jndiProperties] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [systemProperties] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [systemProperties] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [systemEnvironment] PropertySource with lowest search precedence
2016-03-30 13:06:46,428 DEBUG org.springframework.web.context.support.StandardServletEnvironment.addLast[109] - Adding [systemEnvironment] PropertySource with lowest search precedence
2016-03-30 13:06:46,429 DEBUG org.springframework.web.context.support.StandardServletEnvironment.<init>[126] - Initialized StandardServletEnvironment with PropertySources [servletConfigInitParams,servletContextInitParams,jndiProperties,systemProperties,systemEnvironment]
2016-03-30 13:06:46,429 DEBUG org.springframework.web.context.support.StandardServletEnvironment.<init>[126] - Initialized StandardServletEnvironment with PropertySources [servletConfigInitParams,servletContextInitParams,jndiProperties,systemProperties,systemEnvironment]
2016-03-30 13:06:46,432 DEBUG com.choiceedge.library.common.service.v1.endpoint.AbstractEndpoint$RequestContextAwareFilter.init[177] - Initializing filter 'RequestContextAwareFilter'
2016-03-30 13:06:46,436 DEBUG com.choiceedge.library.common.service.v1.endpoint.AbstractEndpoint$RequestContextAwareFilter.init[202] - Filter 'RequestContextAwareFilter' configured successfully
2016-03-30 13:06:46,437 DEBUG com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials[105] - Loading credentials from EnvironmentVariableCredentialsProvider
2016-03-30 13:06:46,438 DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBean[248] - Returning cached instance of singleton bean 'cxf'
2016-03-30 13:06:46,438 DEBUG org.springframework.beans.factory.support.DefaultListableBeanFactory.doGetBean[248] - Returning cached instance of singleton bean 'cxf'

这就是我在 WebApplicationInitializer 中注册 AppConfig 的方式

  @Override
  public void onStartup(final ServletContext servletContext) throws ServletException {

    rootContext = new AnnotationConfigWebApplicationContext();
    rootContext.getEnvironment().setActiveProfiles(System.getenv(ENVIRONMENT));
    LOGGER.info(System.getenv(ENVIRONMENT));
    System.out.println("Starting application in " + System.getenv(ENVIRONMENT));
    rootContext.register(AppConfig.class);
    rootContext.setDisplayName(DISPLAY_NAME);

    // Manage the lifecycle of the root application context
    servletContext.addListener(new ContextLoaderListener(rootContext));
.....

在我的 AppConfig 中我有

@Configuration
@ImportResource("...") - this is for another custom library
@ComponentScan(...)
@EnableIntegration
public class AppConfig {

你的 @Configuration 没有错过 @EnableIntegration 吗?

Starting with version 4.0, the @EnableIntegration annotation has been introduced, to allow the registration of Spring Integration infrastructure beans (see JavaDocs). This annotation is required when only Java & Annotation configuration is used, e.g. with Spring Boot and/or Spring Integration Messaging Annotation support and Spring Integration Java DSL with no XML integration configuration.

http://docs.spring.io/spring-integration/docs/4.3.0.BUILD-SNAPSHOT/reference/html/overview.html#configuration-enable-integration

已解决:解决问题的更改是将通道自动连接到侦听器。并将 PropertyPlaceHolderConfigurer bean 设为静态

@Bean
@Autowired
public ChannelPublishingJmsMessageListener messageListener(MessageChannel channel_1, MessageChannel channel_error){
    ChannelPublishingJmsMessageListener adapter = new ChannelPublishingJmsMessageListener();
     adapter.setRequestChannel(channel_1);
     adapter.setErrorChannel(channel_error);
     return adapter;
}