Java 中的同步数据库访问

Synchronized DB Access in Java

我有一个数据库,其中包含要发送的电子邮件。我正在使用多个线程发送这些电子邮件。我使用的方法是每个线程将查询数据库,在内存中获取 N 封电子邮件并将其标记为已发送。另一个线程将看到这 N 封电子邮件被标记并继续并获取下 N 个条目。

现在这不能正常工作,因为线程 1 可以更新发送的条目,线程 2 查询电子邮件,因此两个线程最终都收到同一组电子邮件。

每个线程都有自己的数据库连接。这是这种行为的根本原因吗?我应该只在所有线程之间共享一个连接对象吗? 或者我可以使用更好的方法吗?

你需要有一种方法来共享一个方法//代码来控制并发。 Sincronize 语句以获取电子邮件并标记它们。然后发了邮件。像这样:

public void processMails(){
     List<String> mails;
     synchronized(this){
           mails  = getMails();
           markMails(mails);
      }
      sendMails(mails);

  }

这个方法可能在您的 DAO Facade 中,所有线程都可以访问。

编辑:

如果你有多个 DAO 实例 class:

public void processMails(){
         List<String> mails;
         synchronize(DAO.class){
               mails  = getMails();
               markMails(mails);
          }
          sendMails(mails);

      }

其他选择

private static final Object LOCK = new Object();

   public void processMails(){
             List<String> mails;
             synchronize(LOCK){
                   mails  = getMails();
                   markMails(mails);
              }
              sendMails(mails);

          }

我的建议是让一个线程负责查询数据库,将检索到的电子邮件放在线程安全的队列中(例如 ArrayBlockingQueue,它的优点是有界);然后,您可以让任意数量的线程从该队列中删除和处理电子邮件。 ArrayBlockingQueue 上的同步开销相当轻,这样您就不需要使用数据库事务或类似的东西。

class EmailChunk {
  Email[] emails;
}

// only instantiate one of these
class DatabaseThread implements Runnable {
  final BlockingQueue<EmailChunk> emailQueue;

  public DatabaseThread(BlockingQueue<EmailChunk> emailQueue) {
    this.emailQueue = emailQueue;
  }

  public void run() {
    EmailChunk newChunk = // query database, create email chunk

    // add newChunk to queue, wait 30 seconds if it's full
    emailQueue.offer(newChunk, 30, TimeUnit.SECONDS);
  }
}

// instantiate as many of these as makes sense
class EmailThread implements Runnable {
  final BlockingQueue<EmailChunk> emailQueue;

  public EmailThread(BlockingQueue<EmailChunk> emailQueue) {
    this.emailQueue = emailQueue;
  }

  public void run() {
    // take next chunk from queue, wait 30 seconds if queue is empty
    emailChunk nextChunk = emailQueue.poll(30, TimeUnit.SECONDS);
  }
}

class Main {
  final int queueSize = 5;

  public static void main(String[] args) {
    BlockingQueue<EmailChunk> emailQueue = new ArrayBlockingQueue<>(queueSize);
    // instantiate DatabaseThread and EmailThread objects with this queue
  }
}