挣扎于 spring 集成 dsl
struggling with spring integration dsl
我正在尝试设置一个带有 spring 集成的小型 spring 启动应用程序。它需要做的就是从 jms 队列中拉出一条消息,将请求解组回一个对象并路由到一个特定的 bean 以进行持久化。我已经测试了路由部分,我可以确认它确实有效。
我在测试中有一个嵌入式 activemq 代理,我可以通过 spring JmsTemplate 发送消息,但它似乎无法解组 xml 有效负载并路由消息。我可以在日志中看到这个:
16:42:09.285 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Sending message
16:42:09.289 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS Session for mode 1: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} java.lang.Object@52d97ab6
16:42:09.289 [main] DEBUG o.s.j.c.JmsTemplate - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} java.lang.Object@52d97ab6
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.r.AbstractRegion - test_broker adding destination: topic://ActiveMQ.Advisory.Producer.Queue.jms/test
16:42:09.298 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS MessageProducer for destination [queue://jms/test]: ActiveMQMessageProducer { value=ID:theblacklodge-59640-1460558526948-4:1:2:1 }
16:42:09.301 [main] DEBUG o.s.j.c.JmsTemplate - Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = <?xml version="1.0" encoding="UTF-8" standalo...uteLogEvent>}
16:42:09.305 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.r.Queue - test_broker Message ID:theblacklodge-59640-1460558526948-4:1:2:1:1 sent to queue://jms/test
16:42:09.306 [ActiveMQ BrokerService[test_broker] Task-2] DEBUG o.a.a.b.r.Queue - queue://jms/test, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1332
16:42:09.314 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Message sent
Spring 集成日志记录:
13:43:48.996 [main] INFO o.s.i.j.JmsMessageDrivenEndpoint - started org.springframework.integration.jms.JmsMessageDrivenEndpoint@71904469
13:43:48.996 [main] INFO o.s.i.d.j.JmsInboundGateway - started org.springframework.integration.dsl.jms.JmsInboundGateway#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.dsl.jms.JmsInboundGateway#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {xml:unmarshalling-transformer} as a subscriber to the 'buildReceiverFlow.channel#0' channel
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildReceiverFlow.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {router} as a subscriber to the 'msg.router' channel
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.msg.router' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#0.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#1.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#2.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#3.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5'
13:43:48.996 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483647
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' of type [class org.springframework.jms.config.JmsListenerEndpointRegistry]
13:43:48.997 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.context.annotation.internalScheduledAnnotationProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
13:43:49.017 [main] DEBUG o.s.b.a.l.AutoConfigurationReportLoggingInitializer -
我不确定我遗漏或配置错误的内容:
测试用例:
@Test
public void jmsIntegrationTest() {
RouteLogEvent log = new RouteLogEvent();
log.setAgentId(8888);
log.setInteracitonId(95634);
log.setMax(5);
log.setQueueTime(1256L);
log.setRouteTime(96541L);
log.setScore(8);
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(500);
marshaller.marshal(log, new StreamResult(bytesOut));
final String xmlPayload = new String(bytesOut.toByteArray());
LOG.info("Sending message");
jmsTemplate.send(jmsQueue, (s) -> {
return s.createTextMessage(xmlPayload);
});
LOG.info("Message sent");
List<RouteLogEvent> events = testDao.findAllRouteLogs();
assertNotNull(events);
assertFalse(events.isEmpty());
List<RouteLogEvent> filtered = events.stream().filter(val -> val.getAgentId() == 8888).collect(Collectors.toList());
assertNotNull(filtered);
assertFalse(filtered.isEmpty());
}
Spring集成配置:
@SpringBootApplication
@EnableIntegration
public class VitelAsyncPersisterApplication {
private static final Map<Class, String> ROUTING_EVENTS = new HashMap<>();
private static final String CHANNEL_RECORDING = "channel-recording";
private static final String CHANNEL_INTERACTION_STATE = "channel-interaction-state";
private static final String CHANNEL_AGENT_STATE = "channel-agent-state";
private static final String CHANNEL_ROUTE_LOG = "channel-route";
static {
ROUTING_EVENTS.put(AgentStateChangeEvent.class, CHANNEL_AGENT_STATE);
ROUTING_EVENTS.put(InteractionStateChangeEvent.class, CHANNEL_INTERACTION_STATE);
ROUTING_EVENTS.put(Recording.class, CHANNEL_RECORDING);
ROUTING_EVENTS.put(RouteLogEvent.class, CHANNEL_ROUTE_LOG);
}
@Value("${jms.queue.entity.persist}")
private String jmsQueueName;
@Value("${jms.broker.url}")
private String jmsBrokerUrl;
@Autowired
private EventDao eventDao;
@Bean
public Jaxb2Marshaller xmlMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setSchema(new ClassPathResource("entities.xsd"));
marshaller.setPackagesToScan("com.mhgad.za.vitel.persister.entities");
return marshaller;
}
@Bean
public ConnectionFactory jmsConnFactory() {
ActiveMQConnectionFactory activeMq = new ActiveMQConnectionFactory(jmsBrokerUrl);
CachingConnectionFactory cachingConnFactory = new CachingConnectionFactory();
cachingConnFactory.setTargetConnectionFactory(activeMq);
return cachingConnFactory;
}
@Bean
public IntegrationFlow buildReceiverFlow(ConnectionFactory jmsConnectionFactory, Jaxb2Marshaller marshaller) {
UnmarshallingTransformer xmlToObjTransformer = Transformers.unmarshaller(marshaller);
JmsInboundGatewaySpec jmsSpec = Jms.inboundGateway(jmsConnectionFactory).destination(jmsQueueName);
return IntegrationFlows.from(jmsSpec).transform(xmlToObjTransformer).channel("msg.router").get();
}
@Bean
public IntegrationFlow buildRouterFlow() {
Function router = (p) -> {
if (ROUTING_EVENTS.containsKey(p.getClass())) {
return ROUTING_EVENTS.get(p.getClass());
} else {
return null;
}
};
return IntegrationFlows.from("msg.router").route(router, m -> m
.subFlowMapping(CHANNEL_AGENT_STATE, sf -> sf.handle((p) -> eventDao.save((AgentStateChangeEvent) p.getPayload())))
.subFlowMapping(CHANNEL_INTERACTION_STATE, sf -> sf.handle((p) -> eventDao.save((InteractionStateChangeEvent) p.getPayload())))
.subFlowMapping(CHANNEL_RECORDING, sf -> sf.handle((p) -> eventDao.save((Recording) p.getPayload())))
.subFlowMapping(CHANNEL_ROUTE_LOG, sf -> sf.handle((p) -> eventDao.save((RouteLogEvent) p.getPayload())))).get();
}
public static void main(String[] args) {
SpringApplication.run(VitelAsyncPersisterApplication.class, args);
}
}
根据您的日志,我确实没有看到任何 Spring 集成基础结构。
所以,也许您刚刚错过了 @EnableIntegration
?
而且你的测试有点奇怪。它向 JMS 发送消息并检查来自 DB 的结果。但是我们看不到您如何启动该配置以进行集成。
由于您将只监听消息并将它们存储在数据库中,请考虑使用单向 JMS 组件 - Jms.messageDriverChannelAdapter()
。而不是 request/reply 网关。
我正在尝试设置一个带有 spring 集成的小型 spring 启动应用程序。它需要做的就是从 jms 队列中拉出一条消息,将请求解组回一个对象并路由到一个特定的 bean 以进行持久化。我已经测试了路由部分,我可以确认它确实有效。
我在测试中有一个嵌入式 activemq 代理,我可以通过 spring JmsTemplate 发送消息,但它似乎无法解组 xml 有效负载并路由消息。我可以在日志中看到这个:
16:42:09.285 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Sending message
16:42:09.289 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS Session for mode 1: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} java.lang.Object@52d97ab6
16:42:09.289 [main] DEBUG o.s.j.c.JmsTemplate - Executing callback on JMS Session: Cached JMS Session: ActiveMQSession {id=ID:theblacklodge-59640-1460558526948-4:1:2,started=false} java.lang.Object@52d97ab6
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.TransportConnector - Publishing: tcp://localhost:61616 for broker transport URI: tcp://localhost:61616
16:42:09.295 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.r.AbstractRegion - test_broker adding destination: topic://ActiveMQ.Advisory.Producer.Queue.jms/test
16:42:09.298 [main] DEBUG o.s.j.c.CachingConnectionFactory - Registering cached JMS MessageProducer for destination [queue://jms/test]: ActiveMQMessageProducer { value=ID:theblacklodge-59640-1460558526948-4:1:2:1 }
16:42:09.301 [main] DEBUG o.s.j.c.JmsTemplate - Sending created message: ActiveMQTextMessage {commandId = 0, responseRequired = false, messageId = null, originalDestination = null, originalTransactionId = null, producerId = null, destination = null, transactionId = null, expiration = 0, timestamp = 0, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = false, type = null, priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false, text = <?xml version="1.0" encoding="UTF-8" standalo...uteLogEvent>}
16:42:09.305 [ActiveMQ Transport: tcp:///127.0.0.1:59466@61616] DEBUG o.a.a.b.r.Queue - test_broker Message ID:theblacklodge-59640-1460558526948-4:1:2:1:1 sent to queue://jms/test
16:42:09.306 [ActiveMQ BrokerService[test_broker] Task-2] DEBUG o.a.a.b.r.Queue - queue://jms/test, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:1332
16:42:09.314 [main] INFO c.m.z.v.o.VitelAsyncPersisterApplicationTests - Message sent
Spring 集成日志记录:
13:43:48.996 [main] INFO o.s.i.j.JmsMessageDrivenEndpoint - started org.springframework.integration.jms.JmsMessageDrivenEndpoint@71904469
13:43:48.996 [main] INFO o.s.i.d.j.JmsInboundGateway - started org.springframework.integration.dsl.jms.JmsInboundGateway#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.dsl.jms.JmsInboundGateway#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {xml:unmarshalling-transformer} as a subscriber to the 'buildReceiverFlow.channel#0' channel
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildReceiverFlow.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#0
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - Adding {router} as a subscriber to the 'msg.router' channel
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.msg.router' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#1
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#0.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#2
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#2'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#1.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#3
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#3'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#2.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#4
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#4'
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5' of type [class org.springframework.integration.config.ConsumerEndpointFactoryBean]
13:43:48.996 [main] INFO o.s.i.c.DirectChannel - Channel 'application:test:-1.buildRouterFlow.subFlow#3.channel#0' has 1 subscriber(s).
13:43:48.996 [main] INFO o.s.i.e.EventDrivenConsumer - started org.springframework.integration.config.ConsumerEndpointFactoryBean#5
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.integration.config.ConsumerEndpointFactoryBean#5'
13:43:48.996 [main] INFO o.s.c.s.DefaultLifecycleProcessor - Starting beans in phase 2147483647
13:43:48.996 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Starting bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry' of type [class org.springframework.jms.config.JmsListenerEndpointRegistry]
13:43:48.997 [main] DEBUG o.s.c.s.DefaultLifecycleProcessor - Successfully started bean 'org.springframework.jms.config.internalJmsListenerEndpointRegistry'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.context.annotation.internalScheduledAnnotationProcessor'
13:43:49.002 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
13:43:49.017 [main] DEBUG o.s.b.a.l.AutoConfigurationReportLoggingInitializer -
我不确定我遗漏或配置错误的内容:
测试用例:
@Test
public void jmsIntegrationTest() {
RouteLogEvent log = new RouteLogEvent();
log.setAgentId(8888);
log.setInteracitonId(95634);
log.setMax(5);
log.setQueueTime(1256L);
log.setRouteTime(96541L);
log.setScore(8);
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(500);
marshaller.marshal(log, new StreamResult(bytesOut));
final String xmlPayload = new String(bytesOut.toByteArray());
LOG.info("Sending message");
jmsTemplate.send(jmsQueue, (s) -> {
return s.createTextMessage(xmlPayload);
});
LOG.info("Message sent");
List<RouteLogEvent> events = testDao.findAllRouteLogs();
assertNotNull(events);
assertFalse(events.isEmpty());
List<RouteLogEvent> filtered = events.stream().filter(val -> val.getAgentId() == 8888).collect(Collectors.toList());
assertNotNull(filtered);
assertFalse(filtered.isEmpty());
}
Spring集成配置:
@SpringBootApplication
@EnableIntegration
public class VitelAsyncPersisterApplication {
private static final Map<Class, String> ROUTING_EVENTS = new HashMap<>();
private static final String CHANNEL_RECORDING = "channel-recording";
private static final String CHANNEL_INTERACTION_STATE = "channel-interaction-state";
private static final String CHANNEL_AGENT_STATE = "channel-agent-state";
private static final String CHANNEL_ROUTE_LOG = "channel-route";
static {
ROUTING_EVENTS.put(AgentStateChangeEvent.class, CHANNEL_AGENT_STATE);
ROUTING_EVENTS.put(InteractionStateChangeEvent.class, CHANNEL_INTERACTION_STATE);
ROUTING_EVENTS.put(Recording.class, CHANNEL_RECORDING);
ROUTING_EVENTS.put(RouteLogEvent.class, CHANNEL_ROUTE_LOG);
}
@Value("${jms.queue.entity.persist}")
private String jmsQueueName;
@Value("${jms.broker.url}")
private String jmsBrokerUrl;
@Autowired
private EventDao eventDao;
@Bean
public Jaxb2Marshaller xmlMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setSchema(new ClassPathResource("entities.xsd"));
marshaller.setPackagesToScan("com.mhgad.za.vitel.persister.entities");
return marshaller;
}
@Bean
public ConnectionFactory jmsConnFactory() {
ActiveMQConnectionFactory activeMq = new ActiveMQConnectionFactory(jmsBrokerUrl);
CachingConnectionFactory cachingConnFactory = new CachingConnectionFactory();
cachingConnFactory.setTargetConnectionFactory(activeMq);
return cachingConnFactory;
}
@Bean
public IntegrationFlow buildReceiverFlow(ConnectionFactory jmsConnectionFactory, Jaxb2Marshaller marshaller) {
UnmarshallingTransformer xmlToObjTransformer = Transformers.unmarshaller(marshaller);
JmsInboundGatewaySpec jmsSpec = Jms.inboundGateway(jmsConnectionFactory).destination(jmsQueueName);
return IntegrationFlows.from(jmsSpec).transform(xmlToObjTransformer).channel("msg.router").get();
}
@Bean
public IntegrationFlow buildRouterFlow() {
Function router = (p) -> {
if (ROUTING_EVENTS.containsKey(p.getClass())) {
return ROUTING_EVENTS.get(p.getClass());
} else {
return null;
}
};
return IntegrationFlows.from("msg.router").route(router, m -> m
.subFlowMapping(CHANNEL_AGENT_STATE, sf -> sf.handle((p) -> eventDao.save((AgentStateChangeEvent) p.getPayload())))
.subFlowMapping(CHANNEL_INTERACTION_STATE, sf -> sf.handle((p) -> eventDao.save((InteractionStateChangeEvent) p.getPayload())))
.subFlowMapping(CHANNEL_RECORDING, sf -> sf.handle((p) -> eventDao.save((Recording) p.getPayload())))
.subFlowMapping(CHANNEL_ROUTE_LOG, sf -> sf.handle((p) -> eventDao.save((RouteLogEvent) p.getPayload())))).get();
}
public static void main(String[] args) {
SpringApplication.run(VitelAsyncPersisterApplication.class, args);
}
}
根据您的日志,我确实没有看到任何 Spring 集成基础结构。
所以,也许您刚刚错过了 @EnableIntegration
?
而且你的测试有点奇怪。它向 JMS 发送消息并检查来自 DB 的结果。但是我们看不到您如何启动该配置以进行集成。
由于您将只监听消息并将它们存储在数据库中,请考虑使用单向 JMS 组件 - Jms.messageDriverChannelAdapter()
。而不是 request/reply 网关。