Spring 集成 DSL JDBC 入站通道适配器
Spring Integration DSL JDBC inbound channel adapter
我使用 spring 集成从数据库中读取数据。
现在我使用轮询适配器
@Bean
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter a = new JdbcPollingChannelAdapter(dataSource(), "SELECT id, clientName FROM client");
return a;
}
流量:
@Bean
public IntegrationFlow pollingFlow() throws Exception {
return IntegrationFlows.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(30000).maxMessagesPerPoll(1)))
.channel(channel1())
.handle(handler())
.get();
}
但我想从其他系统安排我的流程。
有人知道怎么做吗?
schedule my flow from other system
从您的流程角度来看,这听起来像 event driven action
。为此,您应该使用 JdbcOutboundGateway
和相同的 SELECT
.
当然,您应该找到该外部系统的挂钩,以触发您的流输入通道的事件。这可能是任何 Inbound Channel Adapter 或 Message Driven Adapter,例如JMS、AMQP、HTTP 等等。取决于您的中间件中已有什么,以及您的应用程序可以从中向外部系统公开什么。
我想我用自定义触发器解决了这个问题:
public Trigger onlyOnceTrigger() {
return new Trigger() {
private final AtomicBoolean invoked = new AtomicBoolean();
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
};
}
还有我的流量:
public IntegrationFlow pollingFlow() throws Exception {
return IntegrationFlows.from(jdbcMessageSource(),
c -> c.poller(Pollers.trigger(onlyOnceTrigger()).maxMessagesPerPoll(1)))
.channel(channel1())
.handle(handler())
.get();
}
我使用 spring 集成从数据库中读取数据。 现在我使用轮询适配器
@Bean
public MessageSource<Object> jdbcMessageSource() {
JdbcPollingChannelAdapter a = new JdbcPollingChannelAdapter(dataSource(), "SELECT id, clientName FROM client");
return a;
}
流量:
@Bean
public IntegrationFlow pollingFlow() throws Exception {
return IntegrationFlows.from(jdbcMessageSource(),
c -> c.poller(Pollers.fixedRate(30000).maxMessagesPerPoll(1)))
.channel(channel1())
.handle(handler())
.get();
}
但我想从其他系统安排我的流程。 有人知道怎么做吗?
schedule my flow from other system
从您的流程角度来看,这听起来像 event driven action
。为此,您应该使用 JdbcOutboundGateway
和相同的 SELECT
.
当然,您应该找到该外部系统的挂钩,以触发您的流输入通道的事件。这可能是任何 Inbound Channel Adapter 或 Message Driven Adapter,例如JMS、AMQP、HTTP 等等。取决于您的中间件中已有什么,以及您的应用程序可以从中向外部系统公开什么。
我想我用自定义触发器解决了这个问题:
public Trigger onlyOnceTrigger() {
return new Trigger() {
private final AtomicBoolean invoked = new AtomicBoolean();
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
};
}
还有我的流量:
public IntegrationFlow pollingFlow() throws Exception {
return IntegrationFlows.from(jdbcMessageSource(),
c -> c.poller(Pollers.trigger(onlyOnceTrigger()).maxMessagesPerPoll(1)))
.channel(channel1())
.handle(handler())
.get();
}