Spring 集成 DSL - JdbcPollingChannelAdapter 结果未排队
Spring Integration DSL - JdbcPollingChannelAdapter results not queueing
我发誓我有这个工作,但当我在几个月后恢复它时(并升级到 Boot 1.5.9),我遇到了问题。
我设置了一个 JdbcPollingChannelAdapter,我可以很好地执行 receive(),但是当我将适配器放入一个仅对适配器结果进行排队的流中时,运行.receive在队列上总是 returns 一个 null(我可以在控制台日志中看到适配器的 SQL 正在执行)。
测试如下。为什么我可以从适配器获取结果,但不能对结果进行排队?提前感谢您的帮助。
@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureTestDatabase
@JdbcTest
public class JdbcpollingchanneladapterdemoTests {
@Autowired
@Qualifier("dataSource")
DataSource dataSource;
private static PollableChannel outputQueue;
@BeforeClass
public static void setupClass() {
outputQueue = MessageChannels.queue().get();
return;
}
@Test
public void Should_HaveQueue() {
assertThat(outputQueue, instanceOf(QueueChannel.class));
}
@Test
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void Should_Not_HaveMessageOnTheQueue_When_No_DemosAreInTheDatabase() {
Message<?> message = outputQueue.receive(5000);
assertThat(message, nullValue()) ;
}
@Test
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void Should_HaveMessageOnTheQueue_When_DemosIsInTheDatabase() {
assertThat(outputQueue, instanceOf(QueueChannel.class));
Message<?> message = outputQueue.receive(5000);
assertThat(message, notNullValue());
assertThat(message.getPayload().toString(), equalTo("15317")) ;
}
@Test
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void get_message_directly_from_adapter() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
adapter.setMaxRowsPerPoll(1);
Message<?> message = adapter.receive();
assertThat(message, notNullValue());
}
private static class Demo {
private String demo;
String getDemo() {
return demo;
}
void setDemo(String value) {
this.demo = value;
}
@Override
public String toString() {
return "Demo [value=" + this.demo + "]";
}
}
public static class DemoRowMapper implements RowMapper<Demo> {
@Override
public Demo mapRow(ResultSet rs, int rowNum) throws SQLException {
Demo demo = new Demo();
demo.setDemo(rs.getString("CODE"));
return demo;
}
}
@Component
public static class MyFlowAdapter extends IntegrationFlowAdapter {
@Autowired
@Qualifier("dataSource")
DataSource dataSource;
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
adapter.setMaxRowsPerPoll(1);
return from(adapter,
c -> c.poller(Pollers.fixedRate(1000L, 2000L)
.maxMessagesPerPoll(1)
.get()))
.channel(outputQueue);
}
}
}
EDIT 我已经尽可能地简化它,重构为下面的代码。该测试通过了具有通用消息源的流,但在具有 JdbcPollingChannelAdapter 消息源的流上失败。我只是不清楚应该如何配置第二个消息源,以便它像第一个消息源一样成功。
@Test
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
public void Should_HaveMessageOnTheQueue_When_UnsentDemosIsInTheDatabase() {
this.genericFlowContext.registration(new GenericFlowAdapter()).register();
PollableChannel genericChannel = this.beanFactory.getBean("GenericFlowAdapterOutput",
PollableChannel.class);
this.jdbcPollingFlowContext.registration(new JdbcPollingFlowAdapter()).register();
PollableChannel jdbcPollingChannel = this.beanFactory.getBean("JdbcPollingFlowAdapterOutput",
PollableChannel.class);
assertThat(genericChannel.receive(5000).getPayload(), equalTo("15317"));
assertThat(jdbcPollingChannel.receive(5000).getPayload(), equalTo("15317"));
}
private static class GenericFlowAdapter extends IntegrationFlowAdapter {
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(getObjectMessageSource(),
e -> e.poller(Pollers.fixedRate(100)))
.channel(c -> c.queue("GenericFlowAdapterOutput"));
}
private MessageSource<Object> getObjectMessageSource() {
return () -> new GenericMessage<>("15317");
}
}
private static class JdbcPollingFlowAdapter extends IntegrationFlowAdapter {
@Autowired
@Qualifier("dataSource")
DataSource dataSource;
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(getObjectMessageSource(),
e -> e.poller(Pollers.fixedRate(100)))
.channel(c -> c.queue("JdbcPollingFlowAdapterOutput"));
}
private MessageSource<Object> getObjectMessageSource() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
adapter.setMaxRowsPerPoll(1);
return adapter;
}
}
看来您需要将 @EnableIntegration
添加到您的测试配置中。
当您使用 Spring 引导片进行测试时,并非所有自动配置都被加载:
https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/reference/htmlsingle/#test-auto-configuration
更新
JdbcPollingChannelAdapter
的问题是 运行 在单独的计划线程中,已经超出了围绕测试方法的原始事务,其中执行了那些 @Sql
。
你的修复方法是这样的:
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');",
config = @SqlConfig(transactionMode = SqlConfig.TransactionMode.ISOLATED))
注意那个SqlConfig.TransactionMode.ISOLATED
。这样 INSERT
事务被提交并且数据可用于 JdbcPollingChannelAdapter
.
的单独轮询线程
还要注意这个JdbcPollingChannelAdapter
总是returns一条List
条记录。因此,即使 table.
中只有一条记录,您的 assertThat(jdbcPollingChannel.receive(5000).getPayload(), ...);
也应该反对 List<String>
我发誓我有这个工作,但当我在几个月后恢复它时(并升级到 Boot 1.5.9),我遇到了问题。
我设置了一个 JdbcPollingChannelAdapter,我可以很好地执行 receive(),但是当我将适配器放入一个仅对适配器结果进行排队的流中时,运行.receive在队列上总是 returns 一个 null(我可以在控制台日志中看到适配器的 SQL 正在执行)。
测试如下。为什么我可以从适配器获取结果,但不能对结果进行排队?提前感谢您的帮助。
@RunWith(SpringRunner.class)
@SpringBootTest
@AutoConfigureTestDatabase
@JdbcTest
public class JdbcpollingchanneladapterdemoTests {
@Autowired
@Qualifier("dataSource")
DataSource dataSource;
private static PollableChannel outputQueue;
@BeforeClass
public static void setupClass() {
outputQueue = MessageChannels.queue().get();
return;
}
@Test
public void Should_HaveQueue() {
assertThat(outputQueue, instanceOf(QueueChannel.class));
}
@Test
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void Should_Not_HaveMessageOnTheQueue_When_No_DemosAreInTheDatabase() {
Message<?> message = outputQueue.receive(5000);
assertThat(message, nullValue()) ;
}
@Test
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void Should_HaveMessageOnTheQueue_When_DemosIsInTheDatabase() {
assertThat(outputQueue, instanceOf(QueueChannel.class));
Message<?> message = outputQueue.receive(5000);
assertThat(message, notNullValue());
assertThat(message.getPayload().toString(), equalTo("15317")) ;
}
@Test
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
@Sql(executionPhase = ExecutionPhase.AFTER_TEST_METHOD,
statements = "Drop Table DEMO ;")
public void get_message_directly_from_adapter() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
adapter.setMaxRowsPerPoll(1);
Message<?> message = adapter.receive();
assertThat(message, notNullValue());
}
private static class Demo {
private String demo;
String getDemo() {
return demo;
}
void setDemo(String value) {
this.demo = value;
}
@Override
public String toString() {
return "Demo [value=" + this.demo + "]";
}
}
public static class DemoRowMapper implements RowMapper<Demo> {
@Override
public Demo mapRow(ResultSet rs, int rowNum) throws SQLException {
Demo demo = new Demo();
demo.setDemo(rs.getString("CODE"));
return demo;
}
}
@Component
public static class MyFlowAdapter extends IntegrationFlowAdapter {
@Autowired
@Qualifier("dataSource")
DataSource dataSource;
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
adapter.setMaxRowsPerPoll(1);
return from(adapter,
c -> c.poller(Pollers.fixedRate(1000L, 2000L)
.maxMessagesPerPoll(1)
.get()))
.channel(outputQueue);
}
}
}
EDIT 我已经尽可能地简化它,重构为下面的代码。该测试通过了具有通用消息源的流,但在具有 JdbcPollingChannelAdapter 消息源的流上失败。我只是不清楚应该如何配置第二个消息源,以便它像第一个消息源一样成功。
@Test
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Create Table DEMO (CODE VARCHAR(5));")
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');")
public void Should_HaveMessageOnTheQueue_When_UnsentDemosIsInTheDatabase() {
this.genericFlowContext.registration(new GenericFlowAdapter()).register();
PollableChannel genericChannel = this.beanFactory.getBean("GenericFlowAdapterOutput",
PollableChannel.class);
this.jdbcPollingFlowContext.registration(new JdbcPollingFlowAdapter()).register();
PollableChannel jdbcPollingChannel = this.beanFactory.getBean("JdbcPollingFlowAdapterOutput",
PollableChannel.class);
assertThat(genericChannel.receive(5000).getPayload(), equalTo("15317"));
assertThat(jdbcPollingChannel.receive(5000).getPayload(), equalTo("15317"));
}
private static class GenericFlowAdapter extends IntegrationFlowAdapter {
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(getObjectMessageSource(),
e -> e.poller(Pollers.fixedRate(100)))
.channel(c -> c.queue("GenericFlowAdapterOutput"));
}
private MessageSource<Object> getObjectMessageSource() {
return () -> new GenericMessage<>("15317");
}
}
private static class JdbcPollingFlowAdapter extends IntegrationFlowAdapter {
@Autowired
@Qualifier("dataSource")
DataSource dataSource;
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(getObjectMessageSource(),
e -> e.poller(Pollers.fixedRate(100)))
.channel(c -> c.queue("JdbcPollingFlowAdapterOutput"));
}
private MessageSource<Object> getObjectMessageSource() {
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, "SELECT CODE FROM DEMO");
adapter.setRowMapper(new DemoRowMapper());
adapter.setMaxRowsPerPoll(1);
return adapter;
}
}
看来您需要将 @EnableIntegration
添加到您的测试配置中。
当您使用 Spring 引导片进行测试时,并非所有自动配置都被加载:
https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/reference/htmlsingle/#test-auto-configuration
更新
JdbcPollingChannelAdapter
的问题是 运行 在单独的计划线程中,已经超出了围绕测试方法的原始事务,其中执行了那些 @Sql
。
你的修复方法是这样的:
@Sql(executionPhase = ExecutionPhase.BEFORE_TEST_METHOD,
statements = "Insert into DEMO (CODE) VALUES ('12345');",
config = @SqlConfig(transactionMode = SqlConfig.TransactionMode.ISOLATED))
注意那个SqlConfig.TransactionMode.ISOLATED
。这样 INSERT
事务被提交并且数据可用于 JdbcPollingChannelAdapter
.
还要注意这个JdbcPollingChannelAdapter
总是returns一条List
条记录。因此,即使 table.
assertThat(jdbcPollingChannel.receive(5000).getPayload(), ...);
也应该反对 List<String>