jdbc 网关中的批处理

Batch processing in jdbc gateway

我的设置(为清楚起见进行了简化)如下:

<int:inbound-channel-adapter channel="in" expression="0">
    <int:poller cron="0 0 * * * *"/>
    <int:header name="snapshot_date" expression="new java.util.Date()"/>
    <int:header name="correlationId" expression="T(java.util.UUID).randomUUID()"/>
    <!-- more here -->
</int:inbound-channel-adapter>

<int:recipient-list-router input-channel="in" apply-sequence="true">
    <int:recipient channel="data.source.1"/>
    <int:recipient channel="data.source.2"/>
    <!-- more here -->
</int:recipient-list-router>

<int:chain input-channel="data.source.1" output-channel="save">
    <int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
        <int-jdbc:query>
            select * from large_dataset
        </int-jdbc:query>
    </int-jdbc:outbound-gateway>
    <int:header-enricher>
        <int:header name="source" value="data.source.1"/>
    </int:header-enricher>
</int:chain>

<int:chain input-channel="data.source.2" output-channel="save">
    <int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
        <int-jdbc:query>
            select * from another_large_dataset
        </int-jdbc:query>
    </int-jdbc:outbound-gateway>
    <int:header-enricher>
        <int:header name="source" value="data.source.2"/>
    </int:header-enricher>
</int:chain>

<int:chain input-channel="save" output-channel="process">
    <int:splitter expression="T(com.google.common.collect.Lists).partition(payload, 1000)"/>
    <int:transformer>
        <int-groovy:script location="transform.groovy"/>
    </int:transformer>
    <int:service-activator expression="@db2.insertData(payload, headers)"/>
    <int:aggregator/>
</int:chain>

<int:chain input-channel="process" output-channel="nullChannel">
    <int:aggregator/>
    <int:service-activator expression="@finalProcessing.doSomething()"/>
</int:chain>

让我稍微解释一下步骤:

  1. poller 由 cron 触发。消息中包含有关此 运行.
  2. 的一些信息
  3. 消息被发送到多个 data-source 链。
  4. 每个链从大型数据集(100+k 行)中提取数据。结果集消息标有来源 header.
  5. 结果集被拆分成更小的块,转换并插入 db2。
  6. 轮询所有数据源后,使用有关 运行 的信息启动一些复杂的处理。

到目前为止,此配置可以完成工作,但不可扩展。主要问题是我必须先将完整的数据集加载到内存中并沿着管道传递,这可能会导致内存问题。

我的问题是 - 从 db1 中提取结果集、通过管道推送并小批量插入 db2 的最简单方法是什么?

首先自版本 4.0.4 Spring Integration 的 <splitter> 支持 Iterator 作为 payload 以避免内存开销。

我们有 JDBC 的 test-case,它显示了该行为。但如您所见,它基于 Spring Integration Java DSL 和 Java 8 Lamdas。 (是的,即使是没有 Lamdas 的旧 Java 版本也可以这样做)。即使这种情况适合您,您的 <aggregator> 也不应该是 in-memory,因为它会将所有消息收集到 MessageStore.

这是第一种情况。

另一个选项基于 paging 算法,当您的 SELECT 在您的数据库方言中接受一对 WHERE 参数时。对于 Oracle,它可以像:Paging with Oracle。 其中 pageNumber 是一些消息 header - :headers[pageNumber]

之后,你用 <recipient-list-router> 做了一些技巧,将 SELECT 结果发送到 save 通道和其他一些递增的通道 pageNumber header 值并向 data.source.1 频道发送消息,依此类推。当 pageNumber 超出数据范围时,<int-jdbc:outbound-gateway> 停止产生结果。

类似的东西。

我没说这很容易,但至少对你来说应该是一个起点。