在 spring 集成中 refresh/update SessionFactory 的策略
Strategy to refresh/update SessionFactory in spring integration
嗨,我在我的项目中广泛使用 spring 集成,在当前情况下使用 spring 动态流注册动态创建我的 ftp、sftp 适配器。此外,为了提供会话工厂,我根据每个唯一连接的持久配置动态创建它们。
这很好用,但有时我需要动态修改现有的会话配置,在这种情况下,我确实需要会话工厂使用新的会话配置进行刷新。这可能是由于动态更改信用而发生的。
为了做同样的事情,我正在寻找两种方法
- 通过flowcontext.remove(flowid) 删除动态流。但这并没有以某种方式终止流程,我仍然看到旧的会话工厂和流程运行。
- 如果有一种方法可以动态地将 运行 适配器与新的 Sessionfactory 相关联,这也可以。但是还没有找到实现这个的方法。
请帮忙
更新
下面是我的动态注册码
CachingSessionFactory<FTPFile> csf = cache.get(feed.getConnectionId());
IntegrationFlow flow = IntegrationFlows
.from(inboundAdapter(csf).preserveTimestamp(true)//
.remoteDirectory(feed.getRemoteDirectory())//
.regexFilter(feed.getRegexFilter())//
.deleteRemoteFiles(feed.getDeleteRemoteFiles())
.autoCreateLocalDirectory(feed.getAutoCreateLocalDirectory())
.localFilenameExpression(feed.getLocalFilenameExpression())//
.localFilter(localFileFilter)//
.localDirectory(new File(feed.getLocalDirectory())),
e -> e.id(inboundAdapter.get(feed.getId())).autoStartup(false)
.poller(Pollers//
.cron(feed.getPollingFreq())//
.maxMessagesPerPoll(1)//
.advice(retryAdvice)))
.enrichHeaders(s -> s.header(HEADER.feed.name(), feed))//
.filter(selector)//
.handle(fcHandler)//
.handle(fileValidationHandler)//
.channel(ftbSubscriber)//
.get();
this.flowContext.registration(flow).addBean(csf).//
id(inboundFlow.get(feed.getId())).//
autoStartup(false).register();
我正在尝试通过
删除相同的内容
flowContext.remove(flowId);
在删除轮询器和适配器时,它们看起来仍然处于活动状态
java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:275)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:200)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:62)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=14=]0(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:65)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy188.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
*POST Gary 评论 * 更改了链的顺序并删除了他的示例中定义的自动启动,现在轮询适配器看起来像被删除了。
更改了顺序以匹配 Gary 的顺序并从流上下文链中删除了自动启动。尽管如果 autstrtup 为真,但看起来错误仍然存在。
this.flowContext.registration(flow).//
id(inboundFlow.get(feed.getId()))//
.addBean(sessionFactory.get(feed.getId()), csf)//
.register();
* 研究更多 *
standardIntegrationFlow.start
确实会启动流程中的所有组件,而不管自动启动状态如何。我想我们也确实需要为这些检查 isAutostartup,并且只有在启动 IntegrationFlow 时 autostartup 为 True 时才启动它们。 standardIF 下面的现有代码。我有办法覆盖它,或者这是否需要 PR 或修复。
if (!this.running) {
ListIterator<Object> iterator = this.integrationComponents.listIterator(this.integrationComponents.size());
this.lifecycles.clear();
while (iterator.hasPrevious()) {
Object component = iterator.previous();
if (component instanceof SmartLifecycle) {
this.lifecycles.add((SmartLifecycle) component);
((SmartLifecycle) component).start();
}
}
this.running = true;
}
remove()
应该关闭一切。如果您正在使用 CachingSessionFactory
我们需要 destroy()
它,所以它会关闭缓存的会话。
如果您将 bean 添加到注册(使用 addBean()
),流程将自动 destroy()
bean。
如果你能编辑你的问题来显示你的动态注册码,我可以看看。
编辑
对我来说一切正常...
@SpringBootApplication
public class So43916317Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43916317Application.class, args).close();
}
@Autowired
private IntegrationFlowContext context;
@Override
public void run(String... args) throws Exception {
CSF csf = new CSF(sf());
IntegrationFlow flow = IntegrationFlows.from(Ftp.inboundAdapter(csf)
.localDirectory(new File("/tmp/foo"))
.remoteDirectory("bar"), e -> e.poller(Pollers.fixedDelay(1_000)))
.handle(System.out::println)
.get();
this.context.registration(flow)
.id("foo")
.addBean(csf)
.register();
Thread.sleep(10_000);
System.out.println("removing flow");
this.context.remove("foo");
System.out.println("destroying csf");
csf.destroy();
Thread.sleep(10_000);
System.out.println("exiting");
Assert.state(csf.destroyCalled, "destroy not called");
}
@Bean
public DefaultFtpSessionFactory sf() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUsername("ftptest");
sf.setPassword("ftptest");
return sf;
}
public static class CSF extends CachingSessionFactory<FTPFile> {
private boolean destroyCalled;
public CSF(SessionFactory<FTPFile> sessionFactory) {
super(sessionFactory);
}
@Override
public void destroy() {
this.destroyCalled = true;
super.destroy();
}
}
}
日志...
16:15:38.898 [task-scheduler-5] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
16:15:38.898 [task-scheduler-5] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
16:15:39.900 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Obtained org.springframework.integration.ftp.session.FtpSession@149a806 from pool.
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.r.s.CachingSessionFactory - Releasing Session org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool.
16:15:39.903 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Releasing org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
removing flow
16:15:40.756 [main] INFO o.s.i.e.SourcePollingChannelAdapter - stopped org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
16:15:40.757 [main] INFO o.s.i.channel.DirectChannel - Channel 'application.foo.channel#0' has 0 subscriber(s).
16:15:40.757 [main] INFO o.s.i.endpoint.EventDrivenConsumer - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:15:40.757 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Retrieved dependent beans for bean 'foo': [org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0.source, foo.channel#0, com.example.So43916317Application$$Lambda/962287291#0, org.springframework.integration.config.ConsumerEndpointFactoryBean#0, foocom.example.So43916317Application$CSF#0]
destroying csf
16:15:40.757 [main] DEBUG o.s.integration.util.SimplePool - Removing org.springframework.integration.ftp.session.FtpSession@149a806 from the pool
exiting
16:15:50.761 [main] TRACE o.s.c.a.AnnotationConfigApplicationContext - Publishing event in org.springframework.context.annotation.AnnotationConfigApplicationContext@27c86f2d: org.springframework.boot.context.event.ApplicationReadyEvent[source=org.springframework.boot.SpringApplication@5c18016b]
如您所见,轮询在 remove()
后停止,会话由 destroy()
关闭。
EDIT2
如果您关闭了自动启动,则必须通过注册启动...
IntegrationFlowRegistration registration = this.context.registration(flow)
.id("foo")
.addBean(csf)
.autoStartup(false)
.register();
...
registration.start();
嗨,我在我的项目中广泛使用 spring 集成,在当前情况下使用 spring 动态流注册动态创建我的 ftp、sftp 适配器。此外,为了提供会话工厂,我根据每个唯一连接的持久配置动态创建它们。
这很好用,但有时我需要动态修改现有的会话配置,在这种情况下,我确实需要会话工厂使用新的会话配置进行刷新。这可能是由于动态更改信用而发生的。
为了做同样的事情,我正在寻找两种方法
- 通过flowcontext.remove(flowid) 删除动态流。但这并没有以某种方式终止流程,我仍然看到旧的会话工厂和流程运行。
- 如果有一种方法可以动态地将 运行 适配器与新的 Sessionfactory 相关联,这也可以。但是还没有找到实现这个的方法。
请帮忙
更新
下面是我的动态注册码
CachingSessionFactory<FTPFile> csf = cache.get(feed.getConnectionId());
IntegrationFlow flow = IntegrationFlows
.from(inboundAdapter(csf).preserveTimestamp(true)//
.remoteDirectory(feed.getRemoteDirectory())//
.regexFilter(feed.getRegexFilter())//
.deleteRemoteFiles(feed.getDeleteRemoteFiles())
.autoCreateLocalDirectory(feed.getAutoCreateLocalDirectory())
.localFilenameExpression(feed.getLocalFilenameExpression())//
.localFilter(localFileFilter)//
.localDirectory(new File(feed.getLocalDirectory())),
e -> e.id(inboundAdapter.get(feed.getId())).autoStartup(false)
.poller(Pollers//
.cron(feed.getPollingFreq())//
.maxMessagesPerPoll(1)//
.advice(retryAdvice)))
.enrichHeaders(s -> s.header(HEADER.feed.name(), feed))//
.filter(selector)//
.handle(fcHandler)//
.handle(fileValidationHandler)//
.channel(ftbSubscriber)//
.get();
this.flowContext.registration(flow).addBean(csf).//
id(inboundFlow.get(feed.getId())).//
autoStartup(false).register();
我正在尝试通过
删除相同的内容flowContext.remove(flowId);
在删除轮询器和适配器时,它们看起来仍然处于活动状态
java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:275)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:200)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:62)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=14=]0(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:65)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy188.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
*POST Gary 评论 * 更改了链的顺序并删除了他的示例中定义的自动启动,现在轮询适配器看起来像被删除了。
更改了顺序以匹配 Gary 的顺序并从流上下文链中删除了自动启动。尽管如果 autstrtup 为真,但看起来错误仍然存在。
this.flowContext.registration(flow).//
id(inboundFlow.get(feed.getId()))//
.addBean(sessionFactory.get(feed.getId()), csf)//
.register();
* 研究更多 *
standardIntegrationFlow.start
确实会启动流程中的所有组件,而不管自动启动状态如何。我想我们也确实需要为这些检查 isAutostartup,并且只有在启动 IntegrationFlow 时 autostartup 为 True 时才启动它们。 standardIF 下面的现有代码。我有办法覆盖它,或者这是否需要 PR 或修复。
if (!this.running) {
ListIterator<Object> iterator = this.integrationComponents.listIterator(this.integrationComponents.size());
this.lifecycles.clear();
while (iterator.hasPrevious()) {
Object component = iterator.previous();
if (component instanceof SmartLifecycle) {
this.lifecycles.add((SmartLifecycle) component);
((SmartLifecycle) component).start();
}
}
this.running = true;
}
remove()
应该关闭一切。如果您正在使用 CachingSessionFactory
我们需要 destroy()
它,所以它会关闭缓存的会话。
如果您将 bean 添加到注册(使用 addBean()
),流程将自动 destroy()
bean。
如果你能编辑你的问题来显示你的动态注册码,我可以看看。
编辑
对我来说一切正常...
@SpringBootApplication
public class So43916317Application implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(So43916317Application.class, args).close();
}
@Autowired
private IntegrationFlowContext context;
@Override
public void run(String... args) throws Exception {
CSF csf = new CSF(sf());
IntegrationFlow flow = IntegrationFlows.from(Ftp.inboundAdapter(csf)
.localDirectory(new File("/tmp/foo"))
.remoteDirectory("bar"), e -> e.poller(Pollers.fixedDelay(1_000)))
.handle(System.out::println)
.get();
this.context.registration(flow)
.id("foo")
.addBean(csf)
.register();
Thread.sleep(10_000);
System.out.println("removing flow");
this.context.remove("foo");
System.out.println("destroying csf");
csf.destroy();
Thread.sleep(10_000);
System.out.println("exiting");
Assert.state(csf.destroyCalled, "destroy not called");
}
@Bean
public DefaultFtpSessionFactory sf() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("10.0.0.3");
sf.setUsername("ftptest");
sf.setPassword("ftptest");
return sf;
}
public static class CSF extends CachingSessionFactory<FTPFile> {
private boolean destroyCalled;
public CSF(SessionFactory<FTPFile> sessionFactory) {
super(sessionFactory);
}
@Override
public void destroy() {
this.destroyCalled = true;
super.destroy();
}
}
}
日志...
16:15:38.898 [task-scheduler-5] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
16:15:38.898 [task-scheduler-5] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
16:15:39.900 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Obtained org.springframework.integration.ftp.session.FtpSession@149a806 from pool.
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.r.s.CachingSessionFactory - Releasing Session org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool.
16:15:39.903 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Releasing org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
removing flow
16:15:40.756 [main] INFO o.s.i.e.SourcePollingChannelAdapter - stopped org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
16:15:40.757 [main] INFO o.s.i.channel.DirectChannel - Channel 'application.foo.channel#0' has 0 subscriber(s).
16:15:40.757 [main] INFO o.s.i.endpoint.EventDrivenConsumer - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:15:40.757 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Retrieved dependent beans for bean 'foo': [org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0.source, foo.channel#0, com.example.So43916317Application$$Lambda/962287291#0, org.springframework.integration.config.ConsumerEndpointFactoryBean#0, foocom.example.So43916317Application$CSF#0]
destroying csf
16:15:40.757 [main] DEBUG o.s.integration.util.SimplePool - Removing org.springframework.integration.ftp.session.FtpSession@149a806 from the pool
exiting
16:15:50.761 [main] TRACE o.s.c.a.AnnotationConfigApplicationContext - Publishing event in org.springframework.context.annotation.AnnotationConfigApplicationContext@27c86f2d: org.springframework.boot.context.event.ApplicationReadyEvent[source=org.springframework.boot.SpringApplication@5c18016b]
如您所见,轮询在 remove()
后停止,会话由 destroy()
关闭。
EDIT2
如果您关闭了自动启动,则必须通过注册启动...
IntegrationFlowRegistration registration = this.context.registration(flow)
.id("foo")
.addBean(csf)
.autoStartup(false)
.register();
...
registration.start();