Spring 与具有动态查询的 Jdbc 消息源的集成流
Spring Integration Flow with Jdbc Message source which has dynamic query
我正在尝试使用 spring 云数据流以 kafka 作为代理从 oracle DB 捕获更改数据。我正在为此使用轮询机制。我定期使用基本 select 查询轮询数据库以捕获任何更新的数据。为了更好的防故障系统,我在 oracle DB 中保留了上次轮询时间,并用它来获取上次轮询后更新的数据。
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
return jdbcPollingChannelAdapter;
}
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
flowBuilder.channel(this.source.output());
flowBuilder.transform(trans,"transform");
return flowBuilder.get();
}
我在应用程序属性中的查询如下:
query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)
update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP
这非常适合我。我可以通过这种方法从数据库中获取 CDC。
我现在正在查看的问题如下:
创建一个 table 只是为了维持轮询时间是一种负担。我正在寻找在 kafka 主题中维护最后一次投票时间,并在我进行下一次投票时从 kafka 主题中检索该时间。
我修改了 jdbcMessageSource
方法,如下所示:
public MessageSource<Object> jdbcMessageSource() {
String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, query);
return jdbcPollingChannelAdapter;
}
但是 Spring 数据流仅实例化 pollingFlow( )(请参阅上面的代码)bean 一次。因此, 运行 首先的查询将保持不变。我想为每个民意调查用新的民意调查时间更新查询。
有没有一种方法可以让我编写自定义 Integrationflow
以在每次进行投票时更新此查询?
我已经尝试 IntegrationFlowContext
但没有成功。
提前致谢!!!
参见 Artem 对标准适配器中动态查询机制的回答;然而,另一种方法是简单地将 JdbcTemplate
包装在 Bean 中并使用
调用它
IntegrationFlows.from(myPojo(), "runQuery", e -> ...)
...
甚至是一个简单的 lambda
.from(() -> jdbcTemplate...)
我们有这个测试配置(抱歉,它是 XML):
<inbound-channel-adapter query="select * from item where status=:status" channel="target"
data-source="dataSource" select-sql-parameter-source="parameterSource"
update="delete from item"/>
<beans:bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<beans:constructor-arg value=""/>
</beans:bean>
<beans:bean id="parameterSourceFactory"
class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<beans:property name="parameterExpressions">
<beans:map>
<beans:entry key="status" value="@statusBean.which()"/>
</beans:map>
</beans:property>
<beans:property name="sqlParameterTypes">
<beans:map>
<beans:entry key="status" value="#{ T(java.sql.Types).INTEGER}"/>
</beans:map>
</beans:property>
</beans:bean>
<beans:bean id="statusBean"
class="org.springframework.integration.jdbc.config.JdbcPollingChannelAdapterParserTests$Status"/>
注意 ExpressionEvaluatingSqlParameterSourceFactory
及其 createParameterSourceNoCache()
工厂。此结果可用于 select-sql-parameter-source
.
JdbcPollingChannelAdapter
对此事有 setSelectSqlParameterSource
。
因此,您配置一个 ExpressionEvaluatingSqlParameterSourceFactory
以便能够将某些查询参数解析为某些 bean 方法调用的表达式,以便从 Kafka 获取所需的值。然后 createParameterSourceNoCache()
将帮助您获得预期的 SqlParameterSource
.
文档中也有一些信息:https://docs.spring.io/spring-integration/docs/current/reference/html/#jdbc-inbound-channel-adapter
在上面两个答案的帮助下,我找到了方法。
编写一个 jdbc template
并将其包装为一个 bean 并将其用于 Integration Flow
.
@EnableBinding(Source.class)
@AllArgsConstructor
public class StockSource {
private DataSource dataSource;
@Autowired
private JdbcTemplate jdbcTemplate;
private MessageChannelFactory messageChannelFactory; // You can use normal message channel which is available in spring cloud data flow as well.
private List<String> findAll() {
jdbcTemplate = new JdbcTemplate(dataSource);
String time = "10/24/60" . (this means 10 seconds for oracle DB)
String query = << your query here like.. select * from test where (last_updated_time > time) >>;
return jdbcTemplate.query(query, new RowMapper<String>() {
@Override
public String mapRow(ResultSet rs, int rowNum) throws SQLException {
...
...
any row mapper operations that you want to do with you result after the poll.
...
...
...
// Change the time here for the next poll to the DB.
return result;
}
});
}
@Bean
public IntegrationFlow supplyPollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows
.from(this::findAll, spec -> {
spec.poller(Pollers.fixedDelay(5000));
});
flowBuilder.channel(<<Your message channel>>);
return flowBuilder.get();
}
}
在我们的用例中,我们将最后一次轮询时间保存在 kafka 主题中。这是为了减少应用程序状态。现在对数据库的每个新轮询都会在 where
条件下有一个新时间。
P.S:您的消息代理 (kafka/rabbit mq) 应该 运行 在您的本地,或者如果托管在不同的平台上,则连接到它们。
神速!!!
我正在尝试使用 spring 云数据流以 kafka 作为代理从 oracle DB 捕获更改数据。我正在为此使用轮询机制。我定期使用基本 select 查询轮询数据库以捕获任何更新的数据。为了更好的防故障系统,我在 oracle DB 中保留了上次轮询时间,并用它来获取上次轮询后更新的数据。
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, this.properties.getQuery());
jdbcPollingChannelAdapter.setUpdateSql(this.properties.getUpdate());
return jdbcPollingChannelAdapter;
}
@Bean
public IntegrationFlow pollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(jdbcMessageSource(),spec -> spec.poller(Pollers.fixedDelay(3000)));
flowBuilder.channel(this.source.output());
flowBuilder.transform(trans,"transform");
return flowBuilder.get();
}
我在应用程序属性中的查询如下:
query: select * from kafka_test where LAST_UPDATE_TIME >(select LAST_POLL_TIME from poll_time)
update : UPDATE poll_time SET LAST_POLL_TIME = CURRENT_TIMESTAMP
这非常适合我。我可以通过这种方法从数据库中获取 CDC。
我现在正在查看的问题如下:
创建一个 table 只是为了维持轮询时间是一种负担。我正在寻找在 kafka 主题中维护最后一次投票时间,并在我进行下一次投票时从 kafka 主题中检索该时间。
我修改了 jdbcMessageSource
方法,如下所示:
public MessageSource<Object> jdbcMessageSource() {
String query = "select * from kafka_test where LAST_UPDATE_TIME > '"+<Last poll time value read from kafka comes here>+"'";
JdbcPollingChannelAdapter jdbcPollingChannelAdapter =
new JdbcPollingChannelAdapter(this.dataSource, query);
return jdbcPollingChannelAdapter;
}
但是 Spring 数据流仅实例化 pollingFlow( )(请参阅上面的代码)bean 一次。因此, 运行 首先的查询将保持不变。我想为每个民意调查用新的民意调查时间更新查询。
有没有一种方法可以让我编写自定义 Integrationflow
以在每次进行投票时更新此查询?
我已经尝试 IntegrationFlowContext
但没有成功。
提前致谢!!!
参见 Artem 对标准适配器中动态查询机制的回答;然而,另一种方法是简单地将 JdbcTemplate
包装在 Bean 中并使用
IntegrationFlows.from(myPojo(), "runQuery", e -> ...)
...
甚至是一个简单的 lambda
.from(() -> jdbcTemplate...)
我们有这个测试配置(抱歉,它是 XML):
<inbound-channel-adapter query="select * from item where status=:status" channel="target"
data-source="dataSource" select-sql-parameter-source="parameterSource"
update="delete from item"/>
<beans:bean id="parameterSource" factory-bean="parameterSourceFactory"
factory-method="createParameterSourceNoCache">
<beans:constructor-arg value=""/>
</beans:bean>
<beans:bean id="parameterSourceFactory"
class="org.springframework.integration.jdbc.ExpressionEvaluatingSqlParameterSourceFactory">
<beans:property name="parameterExpressions">
<beans:map>
<beans:entry key="status" value="@statusBean.which()"/>
</beans:map>
</beans:property>
<beans:property name="sqlParameterTypes">
<beans:map>
<beans:entry key="status" value="#{ T(java.sql.Types).INTEGER}"/>
</beans:map>
</beans:property>
</beans:bean>
<beans:bean id="statusBean"
class="org.springframework.integration.jdbc.config.JdbcPollingChannelAdapterParserTests$Status"/>
注意 ExpressionEvaluatingSqlParameterSourceFactory
及其 createParameterSourceNoCache()
工厂。此结果可用于 select-sql-parameter-source
.
JdbcPollingChannelAdapter
对此事有 setSelectSqlParameterSource
。
因此,您配置一个 ExpressionEvaluatingSqlParameterSourceFactory
以便能够将某些查询参数解析为某些 bean 方法调用的表达式,以便从 Kafka 获取所需的值。然后 createParameterSourceNoCache()
将帮助您获得预期的 SqlParameterSource
.
文档中也有一些信息:https://docs.spring.io/spring-integration/docs/current/reference/html/#jdbc-inbound-channel-adapter
在上面两个答案的帮助下,我找到了方法。
编写一个 jdbc template
并将其包装为一个 bean 并将其用于 Integration Flow
.
@EnableBinding(Source.class)
@AllArgsConstructor
public class StockSource {
private DataSource dataSource;
@Autowired
private JdbcTemplate jdbcTemplate;
private MessageChannelFactory messageChannelFactory; // You can use normal message channel which is available in spring cloud data flow as well.
private List<String> findAll() {
jdbcTemplate = new JdbcTemplate(dataSource);
String time = "10/24/60" . (this means 10 seconds for oracle DB)
String query = << your query here like.. select * from test where (last_updated_time > time) >>;
return jdbcTemplate.query(query, new RowMapper<String>() {
@Override
public String mapRow(ResultSet rs, int rowNum) throws SQLException {
...
...
any row mapper operations that you want to do with you result after the poll.
...
...
...
// Change the time here for the next poll to the DB.
return result;
}
});
}
@Bean
public IntegrationFlow supplyPollingFlow() {
IntegrationFlowBuilder flowBuilder = IntegrationFlows
.from(this::findAll, spec -> {
spec.poller(Pollers.fixedDelay(5000));
});
flowBuilder.channel(<<Your message channel>>);
return flowBuilder.get();
}
}
在我们的用例中,我们将最后一次轮询时间保存在 kafka 主题中。这是为了减少应用程序状态。现在对数据库的每个新轮询都会在 where
条件下有一个新时间。
P.S:您的消息代理 (kafka/rabbit mq) 应该 运行 在您的本地,或者如果托管在不同的平台上,则连接到它们。
神速!!!