Spring 批处理 - 并非所有记录都来自 MQ 检索
Spring Batch - Not all records are being processed from MQ retrieval
我是 Spring 和 Spring Batch 的新手,所以如果您有任何需要澄清的问题,请随时提出。
我发现 Spring 批处理存在问题,无法在我们的测试或本地环境中重新创建。我们有一项日常工作,通过 JMS 连接到 Websphere MQ 并检索一组记录。此作业使用开箱即用的 JMS ItemReader。我们实现了我们自己的 ItemProcessor,但它除了记录之外没有做任何特别的事情。没有应影响传入记录的过滤器或处理。
问题是,在 MQ 上的 10,000 多条日常记录中,通常只有大约 700 条左右(每次准确的数字都不同)被记录到 ItemProcessor 中。所有记录都成功地从队列中拉出。每次记录的记录数都不同,似乎没有规律。通过将日志文件与 MQ 中的记录列表进行比较,我们可以看到我们的作业 "processed" 看似随机的记录子集。第一条记录可能会被拾取,然后跳过 50 条,然后连续 5 条,等等。每次作业运行时模式都不同。也没有记录异常。
当 运行 在本地主机中使用相同的应用程序并使用相同的数据集进行测试时,ItemProcessor 成功检索并记录了所有 10,000 多条记录。该作业在生产中运行 20 到 40 秒(也不是恒定的),但在测试和本地需要几分钟才能完成(这显然是有道理的,因为它要处理更多的记录)。
所以这是难以解决的问题之一,因为我们无法重新创建它。一个想法是实现我们自己的 ItemReader 并添加额外的日志记录,以便我们可以查看记录是否在 reader 之前或 reader 之后丢失 - 我们现在所知道的是只有一部分记录丢失了由 ItemProcessor 处理。但即使这样也不能解决我们的问题,考虑到它甚至不是解决方案,实施起来也有点及时。
还有其他人遇到过这样的问题吗?任何可能的想法或故障排除建议将不胜感激。下面是一些我们正在使用的jar版本号,供参考。
- Spring - 3.0.5.RELEASE
- Spring 集成 - 2.0.3.RELEASE
- Spring 批处理 - 2.1.7.RELEASE
- 活动 MQ - 5.4.2
- Websphere MQ - 7.0.1
提前感谢您的意见。
编辑:根据请求,处理器代码:
public SMSReminderRow process(Message message) throws Exception {
SMSReminderRow retVal = new SMSReminderRow();
LOGGER.debug("Converting JMS Message to ClaimNotification");
ClaimNotification notification = createClaimNotificationFromMessage(message);
retVal.setShortCode(BatchCommonUtils
.parseShortCodeFromCorpEntCode(notification.getCorpEntCode()));
retVal.setUuid(UUID.randomUUID().toString());
retVal.setPhoneNumber(notification.getPhoneNumber());
retVal.setMessageType(EventCode.SMS_CLAIMS_NOTIFY.toString());
DCRContent content = tsContentHelper.getTSContent(Calendar
.getInstance().getTime(),
BatchCommonConstants.TS_TAG_CLAIMS_NOTIFY,
BatchCommonConstants.TS_TAG_SMSTEXT_TYP);
String claimsNotificationMessage = formatMessageToSend(content.getContent(),
notification.getCorpEntCode());
retVal.setMessageToSend(claimsNotificationMessage);
retVal.setDateTimeToSend(TimeUtils
.getGMTDateTimeStringForDate(new Date()));
LOGGER.debug(
"Finished processing claim notification for {}. Writing row to file.",
notification.getPhoneNumber());
return retVal;
}
JMS 配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<bean id="claimsQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="jms/SMSClaimNotificationCF" />
<property name="lookupOnStartup" value="true" />
<property name="cache" value="true" />
<property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>
<bean id="jmsDestinationResolver"
class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>
<bean id="jmsJndiDestResolver"
class=" org.springframework.jms.support.destination.JndiDestinationResolver"/>
<bean id="claimsJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="claimsQueueConnectionFactory" />
<property name="defaultDestinationName" value="jms/SMSClaimNotificationQueue" />
<property name="destinationResolver" ref="jmsJndiDestResolver" />
<property name="pubSubDomain">
<value>false</value>
</property>
<property name="receiveTimeout">
<value>20000</value>
</property>
</bean>
通常,如果配置正确,MQ 不会丢失消息。那么问题是 "properly configured" 是什么样子的?
一般情况下,丢失消息是由非持久性或非事务性 GET 引起的。
如果非持久性消息正在遍历 QMgr 到 QMgr 通道并且设置了 NPMSPEED(FAST)
,那么 MQ 将不会记录丢失的错误。这就是这些选项的用途,因此不会出现错误。
修复:在 QMgr 到 QMgr 通道上设置 NPMSPEED(NORMAL)
或使消息持久化。
如果客户端在同步点之外获取消息,消息可能会丢失。这与 MQ 无关,这只是一般消息传递的工作方式。如果您告诉 MQ 破坏性地从队列中获取消息并且它无法将该消息传递给远程应用程序,那么 MQ 回滚它的唯一方法是在同步点下检索消息。
修复:使用事务处理会话。
有一些额外的笔记,源于经验。
- 每个人都发誓消息持久性设置为他们认为的样子。但是当我停止应用程序并手动检查消息时,它 very 通常是 not 所期望的。很容易验证,所以不要假设。
- 如果消息在队列中回滚,直到 MQ 或 TCP 超时孤立通道才会发生。这可能长达 2 小时,因此调整通道参数和 TCP Keepalive 以减少它。
- 检查 MQ 的错误日志(QMgr 而非客户端的错误日志)以查找有关事务回滚的消息。
- 如果您仍然无法确定邮件的去向,请尝试使用 SupportPac MA0W 进行跟踪。此跟踪作为出口运行,并且 极其 可配置。您可以跟踪单个队列上的所有
GET
操作,并且只能跟踪该队列。输出采用人类可读的形式。
参见http://activemq.apache.org/jmstemplate-gotchas.html。
使用 JMSTemplate 时出现问题。当我升级我的硬件并突然暴露出一个预先存在的竞争条件时,我才 运行 陷入这些问题。
简写形式是 JMS 模板根据设计和意图在每次调用时打开和关闭连接。它不会看到早于其创建的消息。在大容量and/or高吞吐量场景下,它会无法读取某些消息。
我是 Spring 和 Spring Batch 的新手,所以如果您有任何需要澄清的问题,请随时提出。
我发现 Spring 批处理存在问题,无法在我们的测试或本地环境中重新创建。我们有一项日常工作,通过 JMS 连接到 Websphere MQ 并检索一组记录。此作业使用开箱即用的 JMS ItemReader。我们实现了我们自己的 ItemProcessor,但它除了记录之外没有做任何特别的事情。没有应影响传入记录的过滤器或处理。
问题是,在 MQ 上的 10,000 多条日常记录中,通常只有大约 700 条左右(每次准确的数字都不同)被记录到 ItemProcessor 中。所有记录都成功地从队列中拉出。每次记录的记录数都不同,似乎没有规律。通过将日志文件与 MQ 中的记录列表进行比较,我们可以看到我们的作业 "processed" 看似随机的记录子集。第一条记录可能会被拾取,然后跳过 50 条,然后连续 5 条,等等。每次作业运行时模式都不同。也没有记录异常。
当 运行 在本地主机中使用相同的应用程序并使用相同的数据集进行测试时,ItemProcessor 成功检索并记录了所有 10,000 多条记录。该作业在生产中运行 20 到 40 秒(也不是恒定的),但在测试和本地需要几分钟才能完成(这显然是有道理的,因为它要处理更多的记录)。
所以这是难以解决的问题之一,因为我们无法重新创建它。一个想法是实现我们自己的 ItemReader 并添加额外的日志记录,以便我们可以查看记录是否在 reader 之前或 reader 之后丢失 - 我们现在所知道的是只有一部分记录丢失了由 ItemProcessor 处理。但即使这样也不能解决我们的问题,考虑到它甚至不是解决方案,实施起来也有点及时。
还有其他人遇到过这样的问题吗?任何可能的想法或故障排除建议将不胜感激。下面是一些我们正在使用的jar版本号,供参考。
- Spring - 3.0.5.RELEASE
- Spring 集成 - 2.0.3.RELEASE
- Spring 批处理 - 2.1.7.RELEASE
- 活动 MQ - 5.4.2
- Websphere MQ - 7.0.1
提前感谢您的意见。
编辑:根据请求,处理器代码:
public SMSReminderRow process(Message message) throws Exception {
SMSReminderRow retVal = new SMSReminderRow();
LOGGER.debug("Converting JMS Message to ClaimNotification");
ClaimNotification notification = createClaimNotificationFromMessage(message);
retVal.setShortCode(BatchCommonUtils
.parseShortCodeFromCorpEntCode(notification.getCorpEntCode()));
retVal.setUuid(UUID.randomUUID().toString());
retVal.setPhoneNumber(notification.getPhoneNumber());
retVal.setMessageType(EventCode.SMS_CLAIMS_NOTIFY.toString());
DCRContent content = tsContentHelper.getTSContent(Calendar
.getInstance().getTime(),
BatchCommonConstants.TS_TAG_CLAIMS_NOTIFY,
BatchCommonConstants.TS_TAG_SMSTEXT_TYP);
String claimsNotificationMessage = formatMessageToSend(content.getContent(),
notification.getCorpEntCode());
retVal.setMessageToSend(claimsNotificationMessage);
retVal.setDateTimeToSend(TimeUtils
.getGMTDateTimeStringForDate(new Date()));
LOGGER.debug(
"Finished processing claim notification for {}. Writing row to file.",
notification.getPhoneNumber());
return retVal;
}
JMS 配置:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">
<bean id="claimsQueueConnectionFactory" class="org.springframework.jndi.JndiObjectFactoryBean">
<property name="jndiName" value="jms/SMSClaimNotificationCF" />
<property name="lookupOnStartup" value="true" />
<property name="cache" value="true" />
<property name="proxyInterface" value="javax.jms.ConnectionFactory" />
</bean>
<bean id="jmsDestinationResolver"
class="org.springframework.jms.support.destination.DynamicDestinationResolver">
</bean>
<bean id="jmsJndiDestResolver"
class=" org.springframework.jms.support.destination.JndiDestinationResolver"/>
<bean id="claimsJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="claimsQueueConnectionFactory" />
<property name="defaultDestinationName" value="jms/SMSClaimNotificationQueue" />
<property name="destinationResolver" ref="jmsJndiDestResolver" />
<property name="pubSubDomain">
<value>false</value>
</property>
<property name="receiveTimeout">
<value>20000</value>
</property>
</bean>
通常,如果配置正确,MQ 不会丢失消息。那么问题是 "properly configured" 是什么样子的?
一般情况下,丢失消息是由非持久性或非事务性 GET 引起的。
如果非持久性消息正在遍历 QMgr 到 QMgr 通道并且设置了 NPMSPEED(FAST)
,那么 MQ 将不会记录丢失的错误。这就是这些选项的用途,因此不会出现错误。
修复:在 QMgr 到 QMgr 通道上设置 NPMSPEED(NORMAL)
或使消息持久化。
如果客户端在同步点之外获取消息,消息可能会丢失。这与 MQ 无关,这只是一般消息传递的工作方式。如果您告诉 MQ 破坏性地从队列中获取消息并且它无法将该消息传递给远程应用程序,那么 MQ 回滚它的唯一方法是在同步点下检索消息。
修复:使用事务处理会话。
有一些额外的笔记,源于经验。
- 每个人都发誓消息持久性设置为他们认为的样子。但是当我停止应用程序并手动检查消息时,它 very 通常是 not 所期望的。很容易验证,所以不要假设。
- 如果消息在队列中回滚,直到 MQ 或 TCP 超时孤立通道才会发生。这可能长达 2 小时,因此调整通道参数和 TCP Keepalive 以减少它。
- 检查 MQ 的错误日志(QMgr 而非客户端的错误日志)以查找有关事务回滚的消息。
- 如果您仍然无法确定邮件的去向,请尝试使用 SupportPac MA0W 进行跟踪。此跟踪作为出口运行,并且 极其 可配置。您可以跟踪单个队列上的所有
GET
操作,并且只能跟踪该队列。输出采用人类可读的形式。
参见http://activemq.apache.org/jmstemplate-gotchas.html。
使用 JMSTemplate 时出现问题。当我升级我的硬件并突然暴露出一个预先存在的竞争条件时,我才 运行 陷入这些问题。
简写形式是 JMS 模板根据设计和意图在每次调用时打开和关闭连接。它不会看到早于其创建的消息。在大容量and/or高吞吐量场景下,它会无法读取某些消息。