如何从 Java EE 批处理作业发送电子邮件
How to send emails from a Java EE Batch Job
我需要每天处理大量用户的列表,以便根据某些情况向他们发送电子邮件和短信通知。为此,我正在使用 Java EE 批处理模型。我的工作xml如下:
<step id="sendNotification">
<chunk item-count="10" retry-limit="3">
<reader ref="myItemReader"></reader>
<processor ref="myItemProcessor"></processor>
<writer ref="myItemWriter"></writer>
<retryable-exception-classes>
<include class="java.lang.IllegalArgumentException"/>
</retryable-exception-classes>
</chunk>
</step>
MyItemReader 的 onOpen 方法从数据库中读取所有用户,而 readItem() 使用列表迭代器一次读取一个用户。在 myItemProcessor 中,实际的电子邮件通知被发送给用户,然后用户在 myItemWriter class 的数据库中为该块持久化。
@Named
public class MyItemReader extends AbstractItemReader {
private Iterator<User> iterator = null;
private User lastUser;
@Inject
private MyService service;
@Override
public void open(Serializable checkpoint) throws Exception {
super.open(checkpoint);
List<User> users = service.getUsers();
iterator = users.iterator();
if(checkpoint != null) {
User checkpointUser = (User) checkpoint;
System.out.println("Checkpoint Found: " + checkpointUser.getUserId());
while(iterator.hasNext() && !iterator.next().getUserId().equals(checkpointUser.getUserId())) {
System.out.println("skipping already read users ... ");
}
}
}
@Override
public Object readItem() throws Exception {
User user=null;
if(iterator.hasNext()) {
user = iterator.next();
lastUser = user;
}
return user;
}
@Override
public Serializable checkpointInfo() throws Exception {
return lastUser;
}
}
我的问题是检查点存储了在前一个块中执行的最后一条记录。如果我与接下来的 10 个用户有一个块,并且在第 5 个用户的 myItemProcessor 中抛出异常,那么在重试时将执行整个块并再次处理所有 10 个用户。我不希望再次向已处理的用户发送通知。
有办法处理吗?这应该如何有效地完成?
如有任何帮助,我们将不胜感激。
谢谢
您当前的物品处理器正在执行区块事务范围之外的操作,这导致应用程序状态不同步。如果您的要求是仅在块中的所有项目都成功完成后才发送电子邮件,那么您可以将电子邮件部分移至 ItemWriterListener.afterWrite(items)。
我将以@cheng 的评论为基础。在此感谢他,希望我的回答能为有效地组织和展示选项提供额外的价值。
答案:为另一个 MDB 排队发送邮件
背景:
正如@cheng 指出的那样,失败意味着整个事务被回滚,并且检查点没有前进。
那么如何处理你的区块已经向部分用户而不是所有用户发送了电子邮件这一事实? (您可能会说它回滚了 "side effects"。)
因此我们可以将您的问题重述为:如何从批处理块步骤发送电子邮件?
好吧,假设您有办法通过交易 API(实施 XAResource 等)发送电子邮件,您可以使用它 API。
假设您不这样做,我会向 JMS 队列执行事务性写入,然后使用单独的 MDB 发送电子邮件(正如@cheng 在他的评论之一中所建议的那样)。
建议的替代方法:使用 ItemWriter 将消息发送到 JMS 队列,然后使用单独的 MDB 实际发送电子邮件
使用这种方法,您仍然可以通过批处理和更新数据库来提高效率(反正您一次只发送一封电子邮件),并且您可以从简单的检查点和重新启动中受益,而无需编写复杂的代码错误处理。
这也可能作为跨批处理作业甚至批处理之外的模式重复使用。
其他选择
其他一些我认为不太好的想法,列出以供讨论:
添加批处理应用程序逻辑跟踪通过电子邮件发送的用户(使用 ItemProcessListener)
您可以使用 ItemProcessListener 方法构建自己的 either/both successful/failed 电子邮件列表:afterProcess and onProcessError.
然后,在重新启动时,您可以知道在当前块中哪些用户已通过电子邮件发送,我们在整个块回滚后重新定位到该块,即使已经发送了一些电子邮件。
这肯定会使您的批处理逻辑复杂化,并且您还必须以某种方式保留此成功或失败列表。此外,这种方法可能高度特定于这项工作(而不是排队等待 MDB 处理)。
但它更简单,因为您只有一个批处理作业,无需消息传递提供程序和单独的应用程序组件。
如果你走这条路,你可能想结合使用可跳过和 "no-rollback" 可重试异常。
单项区块
如果您使用 item-count="1" 定义块,则可以避免复杂的检查点和错误处理代码。虽然你牺牲了效率,所以这只有在批处理的其他方面非常引人注目时才有意义:例如通过通用界面调度和管理作业,能够在作业中的失败步骤重新启动
如果您要走这条路,您可能需要考虑将套接字和超时异常定义为 "no-rollback" 异常(使用 ),因为从中没有任何好处回滚,您可能想重试网络超时问题。
既然你特别提到效率,我猜这不适合你。
使用事务同步
这也许可行,但批处理 API 并没有特别简化这件事,您仍然可能遇到块完成但一封或多封电子邮件发送失败的情况。
我需要每天处理大量用户的列表,以便根据某些情况向他们发送电子邮件和短信通知。为此,我正在使用 Java EE 批处理模型。我的工作xml如下:
<step id="sendNotification">
<chunk item-count="10" retry-limit="3">
<reader ref="myItemReader"></reader>
<processor ref="myItemProcessor"></processor>
<writer ref="myItemWriter"></writer>
<retryable-exception-classes>
<include class="java.lang.IllegalArgumentException"/>
</retryable-exception-classes>
</chunk>
</step>
MyItemReader 的 onOpen 方法从数据库中读取所有用户,而 readItem() 使用列表迭代器一次读取一个用户。在 myItemProcessor 中,实际的电子邮件通知被发送给用户,然后用户在 myItemWriter class 的数据库中为该块持久化。
@Named
public class MyItemReader extends AbstractItemReader {
private Iterator<User> iterator = null;
private User lastUser;
@Inject
private MyService service;
@Override
public void open(Serializable checkpoint) throws Exception {
super.open(checkpoint);
List<User> users = service.getUsers();
iterator = users.iterator();
if(checkpoint != null) {
User checkpointUser = (User) checkpoint;
System.out.println("Checkpoint Found: " + checkpointUser.getUserId());
while(iterator.hasNext() && !iterator.next().getUserId().equals(checkpointUser.getUserId())) {
System.out.println("skipping already read users ... ");
}
}
}
@Override
public Object readItem() throws Exception {
User user=null;
if(iterator.hasNext()) {
user = iterator.next();
lastUser = user;
}
return user;
}
@Override
public Serializable checkpointInfo() throws Exception {
return lastUser;
}
}
我的问题是检查点存储了在前一个块中执行的最后一条记录。如果我与接下来的 10 个用户有一个块,并且在第 5 个用户的 myItemProcessor 中抛出异常,那么在重试时将执行整个块并再次处理所有 10 个用户。我不希望再次向已处理的用户发送通知。
有办法处理吗?这应该如何有效地完成?
如有任何帮助,我们将不胜感激。 谢谢
您当前的物品处理器正在执行区块事务范围之外的操作,这导致应用程序状态不同步。如果您的要求是仅在块中的所有项目都成功完成后才发送电子邮件,那么您可以将电子邮件部分移至 ItemWriterListener.afterWrite(items)。
我将以@cheng 的评论为基础。在此感谢他,希望我的回答能为有效地组织和展示选项提供额外的价值。
答案:为另一个 MDB 排队发送邮件
背景:
正如@cheng 指出的那样,失败意味着整个事务被回滚,并且检查点没有前进。
那么如何处理你的区块已经向部分用户而不是所有用户发送了电子邮件这一事实? (您可能会说它回滚了 "side effects"。)
因此我们可以将您的问题重述为:如何从批处理块步骤发送电子邮件?
好吧,假设您有办法通过交易 API(实施 XAResource 等)发送电子邮件,您可以使用它 API。
假设您不这样做,我会向 JMS 队列执行事务性写入,然后使用单独的 MDB 发送电子邮件(正如@cheng 在他的评论之一中所建议的那样)。
建议的替代方法:使用 ItemWriter 将消息发送到 JMS 队列,然后使用单独的 MDB 实际发送电子邮件
使用这种方法,您仍然可以通过批处理和更新数据库来提高效率(反正您一次只发送一封电子邮件),并且您可以从简单的检查点和重新启动中受益,而无需编写复杂的代码错误处理。
这也可能作为跨批处理作业甚至批处理之外的模式重复使用。
其他选择
其他一些我认为不太好的想法,列出以供讨论:
添加批处理应用程序逻辑跟踪通过电子邮件发送的用户(使用 ItemProcessListener)
您可以使用 ItemProcessListener 方法构建自己的 either/both successful/failed 电子邮件列表:afterProcess and onProcessError.
然后,在重新启动时,您可以知道在当前块中哪些用户已通过电子邮件发送,我们在整个块回滚后重新定位到该块,即使已经发送了一些电子邮件。
这肯定会使您的批处理逻辑复杂化,并且您还必须以某种方式保留此成功或失败列表。此外,这种方法可能高度特定于这项工作(而不是排队等待 MDB 处理)。
但它更简单,因为您只有一个批处理作业,无需消息传递提供程序和单独的应用程序组件。
如果你走这条路,你可能想结合使用可跳过和 "no-rollback" 可重试异常。
单项区块
如果您使用 item-count="1" 定义块,则可以避免复杂的检查点和错误处理代码。虽然你牺牲了效率,所以这只有在批处理的其他方面非常引人注目时才有意义:例如通过通用界面调度和管理作业,能够在作业中的失败步骤重新启动
如果您要走这条路,您可能需要考虑将套接字和超时异常定义为 "no-rollback" 异常(使用 ),因为从中没有任何好处回滚,您可能想重试网络超时问题。
既然你特别提到效率,我猜这不适合你。
使用事务同步
这也许可行,但批处理 API 并没有特别简化这件事,您仍然可能遇到块完成但一封或多封电子邮件发送失败的情况。