在 spring 集成中 refresh/update SessionFactory 的策略

Strategy to refresh/update SessionFactory in spring integration

嗨,我在我的项目中广泛使用 spring 集成,在当前情况下使用 spring 动态流注册动态创建我的 ftp、sftp 适配器。此外,为了提供会话工厂,我根据每个唯一连接的持久配置动态创建它们。

这很好用,但有时我需要动态修改现有的会话配置,在这种情况下,我确实需要会话工厂使用新的会话配置进行刷新。这可能是由于动态更改信用而发生的。

为了做同样的事情,我正在寻找两种方法

  1. 通过flowcontext.remove(flowid) 删除动态流。但这并没有以某种方式终止流程,我仍然看到旧的会话工厂和流程运行。
  2. 如果有一种方法可以动态地将 运行 适配器与新的 Sessionfactory 相关联,这也可以。但是还没有找到实现这个的方法。

请帮忙

更新

下面是我的动态注册码

 CachingSessionFactory<FTPFile> csf = cache.get(feed.getConnectionId());
    IntegrationFlow flow = IntegrationFlows
                .from(inboundAdapter(csf).preserveTimestamp(true)//
                      .remoteDirectory(feed.getRemoteDirectory())//
                      .regexFilter(feed.getRegexFilter())//
                      .deleteRemoteFiles(feed.getDeleteRemoteFiles())
                      .autoCreateLocalDirectory(feed.getAutoCreateLocalDirectory())
                      .localFilenameExpression(feed.getLocalFilenameExpression())//
                      .localFilter(localFileFilter)//
                      .localDirectory(new File(feed.getLocalDirectory())),
                      e -> e.id(inboundAdapter.get(feed.getId())).autoStartup(false)
                            .poller(Pollers//
                                  .cron(feed.getPollingFreq())//
                                  .maxMessagesPerPoll(1)//
                                  .advice(retryAdvice)))
                .enrichHeaders(s -> s.header(HEADER.feed.name(), feed))//
                .filter(selector)//
                .handle(fcHandler)//
                .handle(fileValidationHandler)//
                .channel(ftbSubscriber)//
                .get();

          this.flowContext.registration(flow).addBean(csf).//
                id(inboundFlow.get(feed.getId())).//
                autoStartup(false).register();

我正在尝试通过

删除相同的内容
flowContext.remove(flowId);

在删除轮询器和适配器时,它们看起来仍然处于活动状态

java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:275)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:200)
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizingMessageSource.doReceive(AbstractInboundFileSynchronizingMessageSource.java:62)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access[=14=]0(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.call(AbstractPollingEndpoint.java:186)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice.invoke(AbstractRequestHandlerAdvice.java:65)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
at com.sun.proxy.$Proxy188.call(Unknown Source)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

*POST Gary 评论 * 更改了链的顺序并删除了他的示例中定义的自动启动,现在轮询适配器看起来像被删除了。

更改了顺序以匹配 Gary 的顺序并从流上下文链中删除了自动启动。尽管如果 autstrtup 为真,但看起来错误仍然存​​在。

   this.flowContext.registration(flow).//
        id(inboundFlow.get(feed.getId()))//
        .addBean(sessionFactory.get(feed.getId()), csf)//
        .register();

* 研究更多 * standardIntegrationFlow.start 确实会启动流程中的所有组件,而不管自动启动状态如何。我想我们也确实需要为这些检查 isAutostartup,并且只有在启动 IntegrationFlow 时 autostartup 为 True 时才启动它们。 standardIF 下面的现有代码。我有办法覆盖它,或者这是否需要 PR 或修复。

if (!this.running) {
            ListIterator<Object> iterator = this.integrationComponents.listIterator(this.integrationComponents.size());
            this.lifecycles.clear();
            while (iterator.hasPrevious()) {
                Object component = iterator.previous();
                if (component instanceof SmartLifecycle) {
                    this.lifecycles.add((SmartLifecycle) component);
                    ((SmartLifecycle) component).start();
                }
            }
            this.running = true;
        }

remove() 应该关闭一切。如果您正在使用 CachingSessionFactory 我们需要 destroy() 它,所以它会关闭缓存的会话。

如果您将 bean 添加到注册(使用 addBean()),流程将自动 destroy() bean。

如果你能编辑你的问题来显示你的动态注册码,我可以看看。

编辑

对我来说一切正常...

@SpringBootApplication
public class So43916317Application implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(So43916317Application.class, args).close();
    }

    @Autowired
    private IntegrationFlowContext context;

    @Override
    public void run(String... args) throws Exception {
        CSF csf = new CSF(sf());
        IntegrationFlow flow = IntegrationFlows.from(Ftp.inboundAdapter(csf)
                    .localDirectory(new File("/tmp/foo"))
                    .remoteDirectory("bar"), e -> e.poller(Pollers.fixedDelay(1_000)))
                .handle(System.out::println)
                .get();
        this.context.registration(flow)
            .id("foo")
            .addBean(csf)
            .register();
        Thread.sleep(10_000);
        System.out.println("removing flow");
        this.context.remove("foo");
        System.out.println("destroying csf");
        csf.destroy();
        Thread.sleep(10_000);
        System.out.println("exiting");
        Assert.state(csf.destroyCalled, "destroy not called");
    }

    @Bean
    public DefaultFtpSessionFactory sf() {
        DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
        sf.setHost("10.0.0.3");
        sf.setUsername("ftptest");
        sf.setPassword("ftptest");
        return sf;
    }

    public static class CSF extends CachingSessionFactory<FTPFile> {

        private boolean destroyCalled;

        public CSF(SessionFactory<FTPFile> sessionFactory) {
            super(sessionFactory);
        }

        @Override
        public void destroy() {
            this.destroyCalled = true;
            super.destroy();
        }

    }

}

日志...

16:15:38.898 [task-scheduler-5] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
16:15:38.898 [task-scheduler-5] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
16:15:39.900 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Obtained org.springframework.integration.ftp.session.FtpSession@149a806 from pool.
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.r.s.CachingSessionFactory - Releasing Session org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool.
16:15:39.903 [task-scheduler-3] DEBUG o.s.integration.util.SimplePool - Releasing org.springframework.integration.ftp.session.FtpSession@149a806 back to the pool
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.f.i.FtpInboundFileSynchronizer - 0 files transferred
16:15:39.903 [task-scheduler-3] DEBUG o.s.i.e.SourcePollingChannelAdapter - Received no Message during the poll, returning 'false'
removing flow
16:15:40.756 [main] INFO  o.s.i.e.SourcePollingChannelAdapter - stopped org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0
16:15:40.757 [main] INFO  o.s.i.channel.DirectChannel - Channel 'application.foo.channel#0' has 0 subscriber(s).
16:15:40.757 [main] INFO  o.s.i.endpoint.EventDrivenConsumer - stopped org.springframework.integration.config.ConsumerEndpointFactoryBean#0
16:15:40.757 [main] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Retrieved dependent beans for bean 'foo': [org.springframework.integration.ftp.inbound.FtpInboundFileSynchronizer#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0, org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0.source, foo.channel#0, com.example.So43916317Application$$Lambda/962287291#0, org.springframework.integration.config.ConsumerEndpointFactoryBean#0, foocom.example.So43916317Application$CSF#0]
destroying csf
16:15:40.757 [main] DEBUG o.s.integration.util.SimplePool - Removing org.springframework.integration.ftp.session.FtpSession@149a806 from the pool
exiting
16:15:50.761 [main] TRACE o.s.c.a.AnnotationConfigApplicationContext - Publishing event in org.springframework.context.annotation.AnnotationConfigApplicationContext@27c86f2d: org.springframework.boot.context.event.ApplicationReadyEvent[source=org.springframework.boot.SpringApplication@5c18016b]

如您所见,轮询在 remove() 后停止,会话由 destroy() 关闭。

EDIT2

如果您关闭了自动启动,则必须通过注册启动...

IntegrationFlowRegistration registration = this.context.registration(flow)
    .id("foo")
    .addBean(csf)
    .autoStartup(false)
    .register();
...
registration.start();