Spring 集成:轮询器表现怪异

Spring Integration: Poller acting weird

我有一个配置可以使用 jdbc:inbound-channel-adapter 从数据库中读取数据。配置:

   <int-jdbc:inbound-channel-adapter query="SELECT * FROM requests WHERE processed_status = '' OR processed_status IS NULL LIMIT 5" channel="requestsJdbcChannel"
    data-source="dataSource" update="UPDATE requests  SET processed_status = 'INPROGRESS', date_processed = NOW() WHERE id IN (:id)" >
          <int:poller fixed-rate="30000" />
    </int-jdbc:inbound-channel-adapter>

    <int:splitter input-channel="requestsJdbcChannel" output-channel="requestsQueueChannel"/>

    <int:channel id="requestsQueueChannel">
          <int:queue capacity="1000"/>
    </int:channel>

    <int:chain id="requestsChain" input-channel="requestsQueueChannel" output-channel="requestsApiChannel"> 
          <int:poller  max-messages-per-poll="1" fixed-rate="1000" />
    .
    .
    </int:chain>

在上面的配置中,我定义了 jdbc 轮询器 fixed-rate 为 30 秒。当有直接通道而不是 requestsQueueChannel 时,select 查询仅获取 5 行(因为我在 select 查询中使用限制行)并等待下一次轮询的另一个 30 秒。

但是在我使用队列引入 requestsQueueChannel 并在 requestsChain 中添加轮询器之后,jdbc-inbound 无法按预期工作。它不会再等待 30 秒进行下一次轮询。有时它会连续两次(在一秒钟内)轮询数据库,就好像有 2 个线程在运行,并从数据库中获取两组行。但是,除了上面提到的这些之外,没有异步切换。

我的理解是,即使有 requestsQueueChannel,一旦它执行了 select 查询,它应该再等待 30 秒来轮询数据库。我有什么想念的吗?我只想了解此配置的行为。

使用 DirectChannel 时,在当前投票结束之前不会考虑下一次投票。

当使用 QueueChannel(或任务执行器)时,轮询器可以再次 运行。

入站适配器 max-messages-per-poll 默认设置为 1,因此您的配置应该按预期工作。你能 post 某处的 DEBUG 日志吗?

Spring 集成轮询器激活两次的问题,就好像它们是 2 个线程一样,与我在这里遇到的基本相同的问题,文件系统轮询器:

How to prevent duplicate Spring Integration service activations when polling directory

显然这是一个相对常见的错误配置,其中 Spring root 和 servlet 上下文都加载了 Spring 集成配置。结果,确实有两个线程,可以看到轮询器在轮询周期内激活了两次。通常彼此相隔几秒,因为每个都将在其上下文加载时启动。

我确保 Spring 集成配置仅在单个上下文中加载的方法是构建项目包以确保分离。

首先定义一个 web 配置,它只在 "web" 包下选择 类。

@Configuration
@ComponentScan(basePackages = { "com.myapp.web" })
@EnableWebMvc
public class WebConfig extends WebMvcConfigurerAdapter {
    @Override
    public void configureDefaultServletHandling(
            DefaultServletHandlerConfigurer configurer) {
        configurer.enable();
    }
}

创建单独的根配置 类 以加载不属于 servlet 上下文的服务和存储库等 bean。其中之一应该加载 Spring 集成配置。即:

@Configuration
@ComponentScan(basePackages = { "com.myapp.eip" })
@ImportResource(value = { "classpath:META-INF/spring/integration-context.xml"     })
public class EipConfig {
}

配置中的另一个因素需要花一些时间才能解决,那就是我的 servlet 过滤器和网络安全配置需要在根上下文而不是 servlet 上下文中。