挣扎于 spring 集成 dsl

struggling with spring integration dsl

我正在尝试设置一个带有 spring 集成的小型 spring 启动应用程序。它需要做的就是从 jms 队列中拉出一条消息,将请求解组回一个对象并路由到一个特定的 bean 以进行持久化。我已经测试了路由部分,我可以确认它确实有效。

我在测试中有一个嵌入式 activemq 代理,我可以通过 spring JmsTemplate 发送消息,但它似乎无法解组 xml 有效负载并路由消息。我可以在日志中看到这个:

16:42:09.285 [main] INFO  c.m.z.v.o.VitelAsyncPersisterApplicationTests - Sending message
16:42:09.289 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS Session for mode 1: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} java.lang.Object@52d97ab6
16:42:09.289 [main] DEBUG o.s.j.c.JmsTemplate - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} java.lang.Object@52d97ab6
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.r.AbstractRegion - test_broker adding destination: topic://ActiveMQ.Advisory.Producer.Queue.jms/test
16:42:09.298 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS MessageProducer for destination [queue://jms/test]: ActiveMQMessageProducer { value=ID:theblacklodge-59640-1460558526948-4:1:2:1 }
16:42:09.301 [main] DEBUG o.s.j.c.JmsTemplate - Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = <?xml version="1.0" encoding="UTF-8" standalo...uteLogEvent>}
16:42:09.305 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.r.Queue - test_broker Message ID:theblacklodge-59640-1460558526948-4:1:2:1:1 sent to queue://jms/test
16:42:09.306 [ActiveMQ BrokerService[test_broker] Task-2] DEBUG o.a.a.b.r.Queue - queue://jms/test, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1332
16:42:09.314 [main] INFO  c.m.z.v.o.VitelAsyncPersisterApplicationTests - Message sent

Spring 集成日志记录:

13:43:48.996 [main] INFO  o.s.i.j.JmsMessageDrivenEndpoint - started org.springframework.integration.jms.JmsMessageDrivenEndpoint@71904469
13:43:48.996 [main] INFO  o.s.i.d.j.JmsInboundGateway - started org.springframework.integration.dsl.jms.JmsInboundGateway#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.dsl.jms.JmsInboundGateway#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - Adding {xml:unmarshalling-transformer} as a subscriber to the 'buildReceiverFlow.channel#0' channel
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildReceiverFlow.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - Adding {router} as a subscriber to the 'msg.router' channel
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.msg.router' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#0.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#1.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#2.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO  o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#3.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO  o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5'
13:43:48.996 [main] INFO  o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483647
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' of type [class org.springframework.jms.config.JmsListenerEndpointRegistry]
13:43:48.997 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.context.annotation.internalScheduledAnnotationProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
13:43:49.017 [main] DEBUG o.s.b.a.l.AutoConfigurationReportLoggingInitializer - 

我不确定我遗漏或配置错误的内容:

测试用例:

@Test
public void jmsIntegrationTest() {
    RouteLogEvent log = new RouteLogEvent();
    log.setAgentId(8888);
    log.setInteracitonId(95634);
    log.setMax(5);
    log.setQueueTime(1256L);
    log.setRouteTime(96541L);
    log.setScore(8);

    ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(500);

    marshaller.marshal(log, new StreamResult(bytesOut));

    final String xmlPayload = new String(bytesOut.toByteArray());

    LOG.info("Sending message");

    jmsTemplate.send(jmsQueue, (s) -> {
        return s.createTextMessage(xmlPayload);
    });

    LOG.info("Message sent");

    List<RouteLogEvent> events = testDao.findAllRouteLogs();
    assertNotNull(events);
    assertFalse(events.isEmpty());

    List<RouteLogEvent> filtered = events.stream().filter(val -> val.getAgentId() == 8888).collect(Collectors.toList());
    assertNotNull(filtered);
    assertFalse(filtered.isEmpty());
}

Spring集成配置:

@SpringBootApplication
@EnableIntegration
public class VitelAsyncPersisterApplication {

    private static final Map<Class, String> ROUTING_EVENTS = new HashMap<>();

    private static final String CHANNEL_RECORDING = "channel-recording";
    private static final String CHANNEL_INTERACTION_STATE = "channel-interaction-state";
    private static final String CHANNEL_AGENT_STATE = "channel-agent-state";
    private static final String CHANNEL_ROUTE_LOG = "channel-route";

    static {
        ROUTING_EVENTS.put(AgentStateChangeEvent.class, CHANNEL_AGENT_STATE);
        ROUTING_EVENTS.put(InteractionStateChangeEvent.class, CHANNEL_INTERACTION_STATE);
        ROUTING_EVENTS.put(Recording.class, CHANNEL_RECORDING);
        ROUTING_EVENTS.put(RouteLogEvent.class, CHANNEL_ROUTE_LOG);
    }

    @Value("${jms.queue.entity.persist}")
    private String jmsQueueName;

    @Value("${jms.broker.url}")
    private String jmsBrokerUrl;

    @Autowired
    private EventDao eventDao;

    @Bean
    public Jaxb2Marshaller xmlMarshaller() {
        Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
        marshaller.setSchema(new ClassPathResource("entities.xsd"));
        marshaller.setPackagesToScan("com.mhgad.za.vitel.persister.entities");

        return marshaller;
    }

    @Bean
    public ConnectionFactory jmsConnFactory() {
        ActiveMQConnectionFactory activeMq = new ActiveMQConnectionFactory(jmsBrokerUrl);

        CachingConnectionFactory cachingConnFactory = new CachingConnectionFactory();
        cachingConnFactory.setTargetConnectionFactory(activeMq);

        return cachingConnFactory;
    }

    @Bean
    public IntegrationFlow buildReceiverFlow(ConnectionFactory jmsConnectionFactory, Jaxb2Marshaller marshaller) {
        UnmarshallingTransformer xmlToObjTransformer = Transformers.unmarshaller(marshaller);

        JmsInboundGatewaySpec jmsSpec = Jms.inboundGateway(jmsConnectionFactory).destination(jmsQueueName);

        return IntegrationFlows.from(jmsSpec).transform(xmlToObjTransformer).channel("msg.router").get();
    }

    @Bean
    public IntegrationFlow buildRouterFlow() {

        Function router = (p) -> {
            if (ROUTING_EVENTS.containsKey(p.getClass())) {
                return ROUTING_EVENTS.get(p.getClass());
            } else {
                return null;
            }
        };

        return IntegrationFlows.from("msg.router").route(router, m -> m
                .subFlowMapping(CHANNEL_AGENT_STATE, sf -> sf.handle((p) -> eventDao.save((AgentStateChangeEvent) p.getPayload())))
                .subFlowMapping(CHANNEL_INTERACTION_STATE, sf -> sf.handle((p) -> eventDao.save((InteractionStateChangeEvent) p.getPayload())))
                .subFlowMapping(CHANNEL_RECORDING, sf -> sf.handle((p) -> eventDao.save((Recording) p.getPayload())))
                .subFlowMapping(CHANNEL_ROUTE_LOG, sf -> sf.handle((p) -> eventDao.save((RouteLogEvent) p.getPayload())))).get();
    }

    public static void main(String[] args) {
        SpringApplication.run(VitelAsyncPersisterApplication.class, args);
    }
}

根据您的日志,我确实没有看到任何 Spring 集成基础结构。

所以,也许您刚刚错过了 @EnableIntegration

而且你的测试有点奇怪。它向 JMS 发送消息并检查来自 DB 的结果。但是我们看不到您如何启动该配置以进行集成。

由于您将只监听消息并将它们存储在数据库中,请考虑使用单向 JMS 组件 - Jms.messageDriverChannelAdapter()。而不是 request/reply 网关。