jdbc 网关中的批处理
Batch processing in jdbc gateway
我的设置(为清楚起见进行了简化)如下:
<int:inbound-channel-adapter channel="in" expression="0">
<int:poller cron="0 0 * * * *"/>
<int:header name="snapshot_date" expression="new java.util.Date()"/>
<int:header name="correlationId" expression="T(java.util.UUID).randomUUID()"/>
<!-- more here -->
</int:inbound-channel-adapter>
<int:recipient-list-router input-channel="in" apply-sequence="true">
<int:recipient channel="data.source.1"/>
<int:recipient channel="data.source.2"/>
<!-- more here -->
</int:recipient-list-router>
<int:chain input-channel="data.source.1" output-channel="save">
<int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
<int-jdbc:query>
select * from large_dataset
</int-jdbc:query>
</int-jdbc:outbound-gateway>
<int:header-enricher>
<int:header name="source" value="data.source.1"/>
</int:header-enricher>
</int:chain>
<int:chain input-channel="data.source.2" output-channel="save">
<int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
<int-jdbc:query>
select * from another_large_dataset
</int-jdbc:query>
</int-jdbc:outbound-gateway>
<int:header-enricher>
<int:header name="source" value="data.source.2"/>
</int:header-enricher>
</int:chain>
<int:chain input-channel="save" output-channel="process">
<int:splitter expression="T(com.google.common.collect.Lists).partition(payload, 1000)"/>
<int:transformer>
<int-groovy:script location="transform.groovy"/>
</int:transformer>
<int:service-activator expression="@db2.insertData(payload, headers)"/>
<int:aggregator/>
</int:chain>
<int:chain input-channel="process" output-channel="nullChannel">
<int:aggregator/>
<int:service-activator expression="@finalProcessing.doSomething()"/>
</int:chain>
让我稍微解释一下步骤:
- poller 由 cron 触发。消息中包含有关此 运行.
的一些信息
- 消息被发送到多个 data-source 链。
- 每个链从大型数据集(100+k 行)中提取数据。结果集消息标有来源 header.
- 结果集被拆分成更小的块,转换并插入 db2。
- 轮询所有数据源后,使用有关 运行 的信息启动一些复杂的处理。
到目前为止,此配置可以完成工作,但不可扩展。主要问题是我必须先将完整的数据集加载到内存中并沿着管道传递,这可能会导致内存问题。
我的问题是 - 从 db1 中提取结果集、通过管道推送并小批量插入 db2 的最简单方法是什么?
首先自版本 4.0.4 Spring Integration 的 <splitter>
支持 Iterator
作为 payload
以避免内存开销。
我们有 JDBC 的 test-case,它显示了该行为。但如您所见,它基于 Spring Integration Java DSL 和 Java 8 Lamdas。 (是的,即使是没有 Lamdas 的旧 Java 版本也可以这样做)。即使这种情况适合您,您的 <aggregator>
也不应该是 in-memory,因为它会将所有消息收集到 MessageStore
.
这是第一种情况。
另一个选项基于 paging
算法,当您的 SELECT
在您的数据库方言中接受一对 WHERE
参数时。对于 Oracle,它可以像:Paging with Oracle。
其中 pageNumber
是一些消息 header
- :headers[pageNumber]
之后,你用 <recipient-list-router>
做了一些技巧,将 SELECT
结果发送到 save
通道和其他一些递增的通道 pageNumber
header 值并向 data.source.1
频道发送消息,依此类推。当 pageNumber
超出数据范围时,<int-jdbc:outbound-gateway>
停止产生结果。
类似的东西。
我没说这很容易,但至少对你来说应该是一个起点。
我的设置(为清楚起见进行了简化)如下:
<int:inbound-channel-adapter channel="in" expression="0">
<int:poller cron="0 0 * * * *"/>
<int:header name="snapshot_date" expression="new java.util.Date()"/>
<int:header name="correlationId" expression="T(java.util.UUID).randomUUID()"/>
<!-- more here -->
</int:inbound-channel-adapter>
<int:recipient-list-router input-channel="in" apply-sequence="true">
<int:recipient channel="data.source.1"/>
<int:recipient channel="data.source.2"/>
<!-- more here -->
</int:recipient-list-router>
<int:chain input-channel="data.source.1" output-channel="save">
<int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
<int-jdbc:query>
select * from large_dataset
</int-jdbc:query>
</int-jdbc:outbound-gateway>
<int:header-enricher>
<int:header name="source" value="data.source.1"/>
</int:header-enricher>
</int:chain>
<int:chain input-channel="data.source.2" output-channel="save">
<int-jdbc:outbound-gateway data-source="db1" max-rows-per-poll="0">
<int-jdbc:query>
select * from another_large_dataset
</int-jdbc:query>
</int-jdbc:outbound-gateway>
<int:header-enricher>
<int:header name="source" value="data.source.2"/>
</int:header-enricher>
</int:chain>
<int:chain input-channel="save" output-channel="process">
<int:splitter expression="T(com.google.common.collect.Lists).partition(payload, 1000)"/>
<int:transformer>
<int-groovy:script location="transform.groovy"/>
</int:transformer>
<int:service-activator expression="@db2.insertData(payload, headers)"/>
<int:aggregator/>
</int:chain>
<int:chain input-channel="process" output-channel="nullChannel">
<int:aggregator/>
<int:service-activator expression="@finalProcessing.doSomething()"/>
</int:chain>
让我稍微解释一下步骤:
- poller 由 cron 触发。消息中包含有关此 运行. 的一些信息
- 消息被发送到多个 data-source 链。
- 每个链从大型数据集(100+k 行)中提取数据。结果集消息标有来源 header.
- 结果集被拆分成更小的块,转换并插入 db2。
- 轮询所有数据源后,使用有关 运行 的信息启动一些复杂的处理。
到目前为止,此配置可以完成工作,但不可扩展。主要问题是我必须先将完整的数据集加载到内存中并沿着管道传递,这可能会导致内存问题。
我的问题是 - 从 db1 中提取结果集、通过管道推送并小批量插入 db2 的最简单方法是什么?
首先自版本 4.0.4 Spring Integration 的 <splitter>
支持 Iterator
作为 payload
以避免内存开销。
我们有 JDBC 的 test-case,它显示了该行为。但如您所见,它基于 Spring Integration Java DSL 和 Java 8 Lamdas。 (是的,即使是没有 Lamdas 的旧 Java 版本也可以这样做)。即使这种情况适合您,您的 <aggregator>
也不应该是 in-memory,因为它会将所有消息收集到 MessageStore
.
这是第一种情况。
另一个选项基于 paging
算法,当您的 SELECT
在您的数据库方言中接受一对 WHERE
参数时。对于 Oracle,它可以像:Paging with Oracle。
其中 pageNumber
是一些消息 header
- :headers[pageNumber]
之后,你用 <recipient-list-router>
做了一些技巧,将 SELECT
结果发送到 save
通道和其他一些递增的通道 pageNumber
header 值并向 data.source.1
频道发送消息,依此类推。当 pageNumber
超出数据范围时,<int-jdbc:outbound-gateway>
停止产生结果。
类似的东西。
我没说这很容易,但至少对你来说应该是一个起点。