Spring批处理:转换为多线程时出现问题(混合数据)
Spring Batch: problems (mix data) when converting to multithread
也许这是一个反复出现的问题,但我需要根据我的上下文进行一些自定义。
我正在使用 Spring Batch 3.0。1.RELEASE
我有一个简单的工作,需要一些步骤。一步是这样的一大块:
<tasklet transaction-manager="myTransactionManager">
<batch:chunk reader="myReader" processor="myProcessor" writer="myWriter" commit-interval="${commit.interval}">
</batch:chunk>
<bean id="myProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
<list>
<bean class="...MyFirstProcessor">
</bean>
<bean class="...MySecondProcessor">
</bean>
</list>
</property>
- Reader: JdbcCursorItemReader
- Processor:CompositeProcessor 与我的代表
- Writer:CompositeWriter 与我的代表
有了这个配置,我的工作就完美了。
现在,我想将其转换为多线程作业。
在 documentation to basic multi-thread jobs 之后,我在 tasklet 中包含了一个 SympleAsyncTaskExecutor,但它失败了。
我读过 JdbcCursorItemReader 在多线程执行时不能正常工作(是吗?)。我已经将 reader 更改为 JdbcPagingItemReader,这是一场噩梦:作业没有失败,写入过程正常,但数据在线程之间混合,客户数据不正确并且一致(客户 从 其他人 那里获得了 服务、地址等)。
那么,为什么会这样呢?我怎样才能更改为多线程作业?
- 复合处理器和写入器适合多线程吗?
- 如何制作自定义线程安全复合处理器?
- 可能是JDBC reader: 有没有线程安全的多线程JDBC reader?
我对此很困惑,非常感谢任何帮助。
非常感谢。
[编辑 - 已解决]
好吧,解决我的问题的正确和合适的方法是从一开始就为多线程和线程安全执行设计作业。习惯先练习单线程一步执行,理解和知道Spring批处理概念;但是如果你认为你要离开这个阶段,那么必须考虑不可变对象、线程安全列表、映射等......必须提出。
我的问题当前状态的当前修复是我稍后描述的下一个。在测试了 Martin 的建议并考虑了 Michael 的指导方针之后,我终于尽可能地解决了我的问题。接下来的步骤不是很好的做法,但我无法从头开始重建我的工作:
- 将 itemReader 更改为 JdbcPagingItemReader 并将 setState 设置为 false。
- 通过 CopyOnWriteArrayList 更改列表。
- 用 ConcurrentHashMap 改变 HashMap。
- 在每个委托处理器中,通过传递上下文(实现 ApplicationContextAware)并获取 bean 的唯一实例(配置每个注入的 bean 作为 scope="prototype").
所以,如果委托的 bean 是:
<bean class="...MyProcessor">
<property name="otherBean" ref="otherBeanID" />
更改为:
<bean class="...MyProcessor">
<property name="otherBean" value="otherBeanID" />
并且,在 MyProcessor 中,从上下文中获取 otherBeanID 的单个实例; otherBeanID必须配置scope="protoype".
正如我之前所说,它们不是好的样式,但这是我最好的选择,我可以断言每个线程都有自己的和不同的项目实例和其他 bean 实例。
这证明某些 类 没有为正确的多线程执行设计好。
马丁,迈克尔,感谢您的支持。
希望对大家有所帮助。
你的问题问了很多(以后请把这类问题分解成多个更具体的问题)。但是,逐项:
JdbcCursorItemReader
是线程安全的吗?
正如documentation states,它不是。这样做的原因是 JdbcCursorItemReader
包装了一个非线程安全的 ResultSet
。
复合处理器和编写器是否适合多线程?
Spring Batch 提供的 CompositeItemProcessor
被认为是线程安全的,只要委托 ItemProcessor
实现也是线程安全的。您没有提供与您的实现或其配置相关的代码,因此我无法验证它们的线程安全性。但是,鉴于您描述的症状,我的直觉是您的代码中存在某种形式的线程安全问题。
您也不知道您正在使用什么 ItemWriter
实现或它们的配置,因此那里也可能存在与线程相关的问题。
如果您使用有关您的实施和配置的更多信息更新您的问题,我们可以提供更多见解。
如何制作自定义线程安全复合处理器?
实施任何 ItemProcessor
时需要考虑两件事:
- 使其成为线程安全的:遵循基本的线程安全规则(阅读有关该主题的圣经Java Concurrency In Practice一书)将允许您仅通过以下方式扩展组件添加任务执行器。
- 使其幂等: 在 skip/retry 处理期间,项目可能会被重新处理。通过使您的
ItemProcessor
实现幂等,这将防止多次通过处理器的行程产生副作用。
可能是JDBC reader: 有没有线程安全的多线程JDBC reader?
正如您所注意到的,JdbcPaginingItemReader
是线程安全的,并且在 documentation 中也是如此。当使用多线程时,每个块都在它自己的线程中执行。如果您已配置页面大小以匹配提交间隔,则意味着每个页面都在同一线程中处理。
缩放单个步骤的其他选项
当您沿着实现单个多线程步骤的道路前进时,可能会有更好的选择。 Spring Batch 提供 5 个核心缩放选项:
- 多线程步骤 - 正如您现在正在尝试的那样。
- 并行步骤 - 使用 Spring 批处理的拆分功能,您可以并行执行多个步骤。鉴于您在同一步骤中使用复合
ItemProcessor
和复合 ItemWriter
,这可能值得探索(将您当前的复合场景分解为多个并行步骤)。
- Async
ItemProcessor
/ItemWriters
- 此选项允许您在不同的线程中执行处理器逻辑。处理器关闭线程,returns 一个 Future
到 AsyncItemWriter
,它将阻塞直到 Future
returns 被写入。
- 分区 - 这是将数据划分为称为分区的块,这些块由子步骤并行处理。每个分区都由一个实际的、独立的步骤处理,因此使用步骤范围的组件可以防止线程安全问题(每个步骤都有自己的实例)。分区处理可以通过线程在本地执行,也可以跨多个 JVM 远程执行。
- Remote Chunking - 此选项将处理器逻辑分配给其他 JVM 进程。它真的只应该在
ItemProcessor
逻辑是流程中的瓶颈时使用。
您可以在此处阅读 Spring 批处理文档中的所有这些选项:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html
线程安全是一个复杂的问题。只需向过去在单线程环境中工作的代码添加多个线程,通常就能发现代码中的问题。
也许这是一个反复出现的问题,但我需要根据我的上下文进行一些自定义。
我正在使用 Spring Batch 3.0。1.RELEASE
我有一个简单的工作,需要一些步骤。一步是这样的一大块:
<tasklet transaction-manager="myTransactionManager">
<batch:chunk reader="myReader" processor="myProcessor" writer="myWriter" commit-interval="${commit.interval}">
</batch:chunk>
<bean id="myProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
<list>
<bean class="...MyFirstProcessor">
</bean>
<bean class="...MySecondProcessor">
</bean>
</list>
</property>
- Reader: JdbcCursorItemReader
- Processor:CompositeProcessor 与我的代表
- Writer:CompositeWriter 与我的代表
有了这个配置,我的工作就完美了。
现在,我想将其转换为多线程作业。 在 documentation to basic multi-thread jobs 之后,我在 tasklet 中包含了一个 SympleAsyncTaskExecutor,但它失败了。
我读过 JdbcCursorItemReader 在多线程执行时不能正常工作(是吗?)。我已经将 reader 更改为 JdbcPagingItemReader,这是一场噩梦:作业没有失败,写入过程正常,但数据在线程之间混合,客户数据不正确并且一致(客户 从 其他人 那里获得了 服务、地址等)。
那么,为什么会这样呢?我怎样才能更改为多线程作业?
- 复合处理器和写入器适合多线程吗?
- 如何制作自定义线程安全复合处理器?
- 可能是JDBC reader: 有没有线程安全的多线程JDBC reader?
我对此很困惑,非常感谢任何帮助。 非常感谢。
[编辑 - 已解决]
好吧,解决我的问题的正确和合适的方法是从一开始就为多线程和线程安全执行设计作业。习惯先练习单线程一步执行,理解和知道Spring批处理概念;但是如果你认为你要离开这个阶段,那么必须考虑不可变对象、线程安全列表、映射等......必须提出。
我的问题当前状态的当前修复是我稍后描述的下一个。在测试了 Martin 的建议并考虑了 Michael 的指导方针之后,我终于尽可能地解决了我的问题。接下来的步骤不是很好的做法,但我无法从头开始重建我的工作:
- 将 itemReader 更改为 JdbcPagingItemReader 并将 setState 设置为 false。
- 通过 CopyOnWriteArrayList 更改列表。
- 用 ConcurrentHashMap 改变 HashMap。
- 在每个委托处理器中,通过传递上下文(实现 ApplicationContextAware)并获取 bean 的唯一实例(配置每个注入的 bean 作为 scope="prototype").
所以,如果委托的 bean 是:
<bean class="...MyProcessor">
<property name="otherBean" ref="otherBeanID" />
更改为:
<bean class="...MyProcessor">
<property name="otherBean" value="otherBeanID" />
并且,在 MyProcessor 中,从上下文中获取 otherBeanID 的单个实例; otherBeanID必须配置scope="protoype".
正如我之前所说,它们不是好的样式,但这是我最好的选择,我可以断言每个线程都有自己的和不同的项目实例和其他 bean 实例。
这证明某些 类 没有为正确的多线程执行设计好。
马丁,迈克尔,感谢您的支持。
希望对大家有所帮助。
你的问题问了很多(以后请把这类问题分解成多个更具体的问题)。但是,逐项:
JdbcCursorItemReader
是线程安全的吗?
正如documentation states,它不是。这样做的原因是 JdbcCursorItemReader
包装了一个非线程安全的 ResultSet
。
复合处理器和编写器是否适合多线程?
Spring Batch 提供的 CompositeItemProcessor
被认为是线程安全的,只要委托 ItemProcessor
实现也是线程安全的。您没有提供与您的实现或其配置相关的代码,因此我无法验证它们的线程安全性。但是,鉴于您描述的症状,我的直觉是您的代码中存在某种形式的线程安全问题。
您也不知道您正在使用什么 ItemWriter
实现或它们的配置,因此那里也可能存在与线程相关的问题。
如果您使用有关您的实施和配置的更多信息更新您的问题,我们可以提供更多见解。
如何制作自定义线程安全复合处理器?
实施任何 ItemProcessor
时需要考虑两件事:
- 使其成为线程安全的:遵循基本的线程安全规则(阅读有关该主题的圣经Java Concurrency In Practice一书)将允许您仅通过以下方式扩展组件添加任务执行器。
- 使其幂等: 在 skip/retry 处理期间,项目可能会被重新处理。通过使您的
ItemProcessor
实现幂等,这将防止多次通过处理器的行程产生副作用。
可能是JDBC reader: 有没有线程安全的多线程JDBC reader?
正如您所注意到的,JdbcPaginingItemReader
是线程安全的,并且在 documentation 中也是如此。当使用多线程时,每个块都在它自己的线程中执行。如果您已配置页面大小以匹配提交间隔,则意味着每个页面都在同一线程中处理。
缩放单个步骤的其他选项
当您沿着实现单个多线程步骤的道路前进时,可能会有更好的选择。 Spring Batch 提供 5 个核心缩放选项:
- 多线程步骤 - 正如您现在正在尝试的那样。
- 并行步骤 - 使用 Spring 批处理的拆分功能,您可以并行执行多个步骤。鉴于您在同一步骤中使用复合
ItemProcessor
和复合ItemWriter
,这可能值得探索(将您当前的复合场景分解为多个并行步骤)。 - Async
ItemProcessor
/ItemWriters
- 此选项允许您在不同的线程中执行处理器逻辑。处理器关闭线程,returns 一个Future
到AsyncItemWriter
,它将阻塞直到Future
returns 被写入。 - 分区 - 这是将数据划分为称为分区的块,这些块由子步骤并行处理。每个分区都由一个实际的、独立的步骤处理,因此使用步骤范围的组件可以防止线程安全问题(每个步骤都有自己的实例)。分区处理可以通过线程在本地执行,也可以跨多个 JVM 远程执行。
- Remote Chunking - 此选项将处理器逻辑分配给其他 JVM 进程。它真的只应该在
ItemProcessor
逻辑是流程中的瓶颈时使用。
您可以在此处阅读 Spring 批处理文档中的所有这些选项:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html
线程安全是一个复杂的问题。只需向过去在单线程环境中工作的代码添加多个线程,通常就能发现代码中的问题。