Spring: 使用循环发送多条消息
Spring: Send multiple messages using a loop
假设我在 Spring XD 中有一个自定义模块(我正在使用 Spring XD + Spring 集成 + 休眠)。该模块基本上从数据库中获取一些东西(假设我使用一个休眠实体存储它,所以我使用一个名为 "DataFromDB" 的对象)。 DataFromDB 是一个列表,然后我从列表中获取每个元素,我想使用类似的方式发送它:
String payload = convertDataFromDBToJson(record);
return MessageBuilder.createMessage(payload, message.getHeaders());
- 问题是每次我必须发送一条消息时,我都必须 return 一条消息。
- 所以我想遍历 DataFromDB 列表并将每个元素作为消息发送。
有没有办法发送多条消息?
编辑:
我刚刚根据试图复制场景的评论创建了一个小示例。这是我的:
我的变压器class:
public class TransformerClass {
public Collection<Message<?>> transformerMethod(Message<?> message) {
List<Message<?>> messages = new ArrayList<Message<?>>();
messages.add(new GenericMessage<>("foo"));
messages.add(new GenericMessage<>("bar"));
return messages;
}
我的xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<tx:annotation-driven />
<int:channel id="input" />
<bean id="transFormerClass" class="myModule.TransformerClass">
</bean>
<int:transformer input-channel="input" output-channel="output" ref="transFormerClass" method="transformerMethod"/>
<int:channel id="output"/>
</beans>
我的测试class:
@ModuleName(value = "someModule", type = ModuleType.processor)
public class TransformerClassTest extends xDTest {
private String streamName = "myStream";
private String chainTest = "someModule";
@SuppressWarnings("unchecked")
@Test
public void testPartnerNotification() throws IOException {
this.chain = SingleNodeProcessingChainSupport.chain(application, streamName, chainTest);
//Just to send something to the module as a input.
Message<String> input = MessageBuilder.createMessage("hello world", buildHeaders());
this.chain.send(input);
//Receiving a Single message
Message<String> result = (Message<String>) chain.receive(5000);
System.out.println("Result: " + result);
}
private MessageHeaders buildHeaders() {
Map<String, Object> hashMap = new HashMap<String,Object>();
hashMap.put("test", "testing");
MessageHeaders headers = new MessageHeaders(hashMap);
return headers;
}
}
输出为:
Result: GenericMessage [payload=[GenericMessage [payload=foo, headers={timestamp=1475072951345, id=7b6c79a2-db85-563e-c238-262a31141456}], GenericMessage [payload=bar, headers={timestamp=1475072951345, id=31c8ef0e-3513-b95e-3a25-4fd3550f2fea}]], headers={timestamp=1475072951347, id=f90d94c4-e323-70ed-62ee-4b8bce64814d, test=testing}]
我正在使用 Spring 集成 4.2。2.RELEASE。
如果您 return Collection<Message<?>>
,它们将作为单独的消息发送。
编辑
@EnableIntegration
@Configuration
public class So39708474Application {
@Bean
public DirectChannel input() {
return new DirectChannel();
}
@Bean
public DirectChannel output() {
return new DirectChannel();
}
@Bean
public Foo foo() {
return new Foo();
}
public static class Foo {
@ServiceActivator(inputChannel = "input", outputChannel = "output")
public Collection<Message<?>> handle(Message<?> in) {
List<Message<?>> messages = new ArrayList<Message<?>>();
messages.add(new GenericMessage<>("foo"));
messages.add(new GenericMessage<>("bar"));
return messages;
}
}
}
和
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = So39708474Application.class)
public class So39708474ApplicationTests {
@Autowired
private MessageChannel input;
@Autowired
private SubscribableChannel output;
@Test
public void contextLoads() {
AtomicInteger count = new AtomicInteger();
this.output.subscribe(m -> {
count.incrementAndGet();
System.out.println(m);
});
this.input.send(new GenericMessage<>("test"));
assertEquals(2, count.get());
}
}
假设我在 Spring XD 中有一个自定义模块(我正在使用 Spring XD + Spring 集成 + 休眠)。该模块基本上从数据库中获取一些东西(假设我使用一个休眠实体存储它,所以我使用一个名为 "DataFromDB" 的对象)。 DataFromDB 是一个列表,然后我从列表中获取每个元素,我想使用类似的方式发送它:
String payload = convertDataFromDBToJson(record);
return MessageBuilder.createMessage(payload, message.getHeaders());
- 问题是每次我必须发送一条消息时,我都必须 return 一条消息。
- 所以我想遍历 DataFromDB 列表并将每个元素作为消息发送。
有没有办法发送多条消息?
编辑:
我刚刚根据试图复制场景的评论创建了一个小示例。这是我的:
我的变压器class:
public class TransformerClass {
public Collection<Message<?>> transformerMethod(Message<?> message) {
List<Message<?>> messages = new ArrayList<Message<?>>();
messages.add(new GenericMessage<>("foo"));
messages.add(new GenericMessage<>("bar"));
return messages;
}
我的xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<tx:annotation-driven />
<int:channel id="input" />
<bean id="transFormerClass" class="myModule.TransformerClass">
</bean>
<int:transformer input-channel="input" output-channel="output" ref="transFormerClass" method="transformerMethod"/>
<int:channel id="output"/>
</beans>
我的测试class:
@ModuleName(value = "someModule", type = ModuleType.processor)
public class TransformerClassTest extends xDTest {
private String streamName = "myStream";
private String chainTest = "someModule";
@SuppressWarnings("unchecked")
@Test
public void testPartnerNotification() throws IOException {
this.chain = SingleNodeProcessingChainSupport.chain(application, streamName, chainTest);
//Just to send something to the module as a input.
Message<String> input = MessageBuilder.createMessage("hello world", buildHeaders());
this.chain.send(input);
//Receiving a Single message
Message<String> result = (Message<String>) chain.receive(5000);
System.out.println("Result: " + result);
}
private MessageHeaders buildHeaders() {
Map<String, Object> hashMap = new HashMap<String,Object>();
hashMap.put("test", "testing");
MessageHeaders headers = new MessageHeaders(hashMap);
return headers;
}
}
输出为:
Result: GenericMessage [payload=[GenericMessage [payload=foo, headers={timestamp=1475072951345, id=7b6c79a2-db85-563e-c238-262a31141456}], GenericMessage [payload=bar, headers={timestamp=1475072951345, id=31c8ef0e-3513-b95e-3a25-4fd3550f2fea}]], headers={timestamp=1475072951347, id=f90d94c4-e323-70ed-62ee-4b8bce64814d, test=testing}]
我正在使用 Spring 集成 4.2。2.RELEASE。
如果您 return Collection<Message<?>>
,它们将作为单独的消息发送。
编辑
@EnableIntegration
@Configuration
public class So39708474Application {
@Bean
public DirectChannel input() {
return new DirectChannel();
}
@Bean
public DirectChannel output() {
return new DirectChannel();
}
@Bean
public Foo foo() {
return new Foo();
}
public static class Foo {
@ServiceActivator(inputChannel = "input", outputChannel = "output")
public Collection<Message<?>> handle(Message<?> in) {
List<Message<?>> messages = new ArrayList<Message<?>>();
messages.add(new GenericMessage<>("foo"));
messages.add(new GenericMessage<>("bar"));
return messages;
}
}
}
和
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = So39708474Application.class)
public class So39708474ApplicationTests {
@Autowired
private MessageChannel input;
@Autowired
private SubscribableChannel output;
@Test
public void contextLoads() {
AtomicInteger count = new AtomicInteger();
this.output.subscribe(m -> {
count.incrementAndGet();
System.out.println(m);
});
this.input.send(new GenericMessage<>("test"));
assertEquals(2, count.get());
}
}