Java 中的同步数据库访问
Synchronized DB Access in Java
我有一个数据库,其中包含要发送的电子邮件。我正在使用多个线程发送这些电子邮件。我使用的方法是每个线程将查询数据库,在内存中获取 N 封电子邮件并将其标记为已发送。另一个线程将看到这 N 封电子邮件被标记并继续并获取下 N 个条目。
现在这不能正常工作,因为线程 1 可以更新发送的条目,线程 2 查询电子邮件,因此两个线程最终都收到同一组电子邮件。
每个线程都有自己的数据库连接。这是这种行为的根本原因吗?我应该只在所有线程之间共享一个连接对象吗?
或者我可以使用更好的方法吗?
你需要有一种方法来共享一个方法//代码来控制并发。 Sincronize 语句以获取电子邮件并标记它们。然后发了邮件。像这样:
public void processMails(){
List<String> mails;
synchronized(this){
mails = getMails();
markMails(mails);
}
sendMails(mails);
}
这个方法可能在您的 DAO Facade 中,所有线程都可以访问。
编辑:
如果你有多个 DAO 实例 class:
public void processMails(){
List<String> mails;
synchronize(DAO.class){
mails = getMails();
markMails(mails);
}
sendMails(mails);
}
其他选择
private static final Object LOCK = new Object();
public void processMails(){
List<String> mails;
synchronize(LOCK){
mails = getMails();
markMails(mails);
}
sendMails(mails);
}
我的建议是让一个线程负责查询数据库,将检索到的电子邮件放在线程安全的队列中(例如 ArrayBlockingQueue,它的优点是有界);然后,您可以让任意数量的线程从该队列中删除和处理电子邮件。 ArrayBlockingQueue
上的同步开销相当轻,这样您就不需要使用数据库事务或类似的东西。
class EmailChunk {
Email[] emails;
}
// only instantiate one of these
class DatabaseThread implements Runnable {
final BlockingQueue<EmailChunk> emailQueue;
public DatabaseThread(BlockingQueue<EmailChunk> emailQueue) {
this.emailQueue = emailQueue;
}
public void run() {
EmailChunk newChunk = // query database, create email chunk
// add newChunk to queue, wait 30 seconds if it's full
emailQueue.offer(newChunk, 30, TimeUnit.SECONDS);
}
}
// instantiate as many of these as makes sense
class EmailThread implements Runnable {
final BlockingQueue<EmailChunk> emailQueue;
public EmailThread(BlockingQueue<EmailChunk> emailQueue) {
this.emailQueue = emailQueue;
}
public void run() {
// take next chunk from queue, wait 30 seconds if queue is empty
emailChunk nextChunk = emailQueue.poll(30, TimeUnit.SECONDS);
}
}
class Main {
final int queueSize = 5;
public static void main(String[] args) {
BlockingQueue<EmailChunk> emailQueue = new ArrayBlockingQueue<>(queueSize);
// instantiate DatabaseThread and EmailThread objects with this queue
}
}
我有一个数据库,其中包含要发送的电子邮件。我正在使用多个线程发送这些电子邮件。我使用的方法是每个线程将查询数据库,在内存中获取 N 封电子邮件并将其标记为已发送。另一个线程将看到这 N 封电子邮件被标记并继续并获取下 N 个条目。
现在这不能正常工作,因为线程 1 可以更新发送的条目,线程 2 查询电子邮件,因此两个线程最终都收到同一组电子邮件。
每个线程都有自己的数据库连接。这是这种行为的根本原因吗?我应该只在所有线程之间共享一个连接对象吗? 或者我可以使用更好的方法吗?
你需要有一种方法来共享一个方法//代码来控制并发。 Sincronize 语句以获取电子邮件并标记它们。然后发了邮件。像这样:
public void processMails(){
List<String> mails;
synchronized(this){
mails = getMails();
markMails(mails);
}
sendMails(mails);
}
这个方法可能在您的 DAO Facade 中,所有线程都可以访问。
编辑:
如果你有多个 DAO 实例 class:
public void processMails(){
List<String> mails;
synchronize(DAO.class){
mails = getMails();
markMails(mails);
}
sendMails(mails);
}
其他选择
private static final Object LOCK = new Object();
public void processMails(){
List<String> mails;
synchronize(LOCK){
mails = getMails();
markMails(mails);
}
sendMails(mails);
}
我的建议是让一个线程负责查询数据库,将检索到的电子邮件放在线程安全的队列中(例如 ArrayBlockingQueue,它的优点是有界);然后,您可以让任意数量的线程从该队列中删除和处理电子邮件。 ArrayBlockingQueue
上的同步开销相当轻,这样您就不需要使用数据库事务或类似的东西。
class EmailChunk {
Email[] emails;
}
// only instantiate one of these
class DatabaseThread implements Runnable {
final BlockingQueue<EmailChunk> emailQueue;
public DatabaseThread(BlockingQueue<EmailChunk> emailQueue) {
this.emailQueue = emailQueue;
}
public void run() {
EmailChunk newChunk = // query database, create email chunk
// add newChunk to queue, wait 30 seconds if it's full
emailQueue.offer(newChunk, 30, TimeUnit.SECONDS);
}
}
// instantiate as many of these as makes sense
class EmailThread implements Runnable {
final BlockingQueue<EmailChunk> emailQueue;
public EmailThread(BlockingQueue<EmailChunk> emailQueue) {
this.emailQueue = emailQueue;
}
public void run() {
// take next chunk from queue, wait 30 seconds if queue is empty
emailChunk nextChunk = emailQueue.poll(30, TimeUnit.SECONDS);
}
}
class Main {
final int queueSize = 5;
public static void main(String[] args) {
BlockingQueue<EmailChunk> emailQueue = new ArrayBlockingQueue<>(queueSize);
// instantiate DatabaseThread and EmailThread objects with this queue
}
}