如何使用多线程避免在 Oracle 中进行双重插入

How to avoid double insert in Oracle with multithreading

在我的应用程序中,当我将 INSERT 插入 Oracle 时,我遇到了很多关于双插入的异常。

我的测试代码是这样的

class SomeClass{
EntityManager em;
Dao dao;

@Override
void insert(String a, String b){
    MyObject object =new MyObject(a,b);
    dao.insertObject(object);
    }
}

class OtherClass{
    private final ExecutorService completableFutureExecutor =
            new ThreadPoolExecutor(10, 11, 30L, TimeUnit.SECONDS, new SynchronousQueue<>());

    public void method() {

        Runnable task1 = () -> dao.insert("a","b");
        for (int i = 0; i < 5; i++) {
            completableFutureExecutor.submit(task1);
        }
    }
}

在 openJpa 日志中我看到类似的内容

240981  JpaPersistenceUnit  TRACE  [pool-25-thread-3] openjpa.jdbc.SQL - <t 1427395137, conn 1023570122> executing prepstmnt 743213969 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240983  JpaPersistenceUnit  TRACE  [pool-25-thread-9] openjpa.jdbc.SQL - <t 1116539025, conn 246735198> executing prepstmnt 468904024 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-5] openjpa.jdbc.SQL - <t 2107513837, conn 1168031715> executing prepstmnt 1872262728 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-1] openjpa.jdbc.SQL - <t 1881630463, conn 2024928498> executing prepstmnt 1258578230 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]
240986  JpaPersistenceUnit  TRACE  [pool-25-thread-7] openjpa.jdbc.SQL - <t 1202968848, conn 1876787130> executing prepstmnt 1733696457 SELECT t0.COLUMN1, t0.COLUMN2  FROM TABLE t0 WHERE t0.COLUMN2 = ? AND t0.COLUMN1 = ? [params=(String) a, (String) b]

240998  JpaPersistenceUnit  TRACE  [pool-25-thread-9] openjpa.jdbc.SQL - <t 1116539025, conn 246735198> executing prepstmnt 752805342 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params= (String) a,   (String) b]
240999  JpaPersistenceUnit  TRACE  [pool-25-thread-3] openjpa.jdbc.SQL - <t 1427395137, conn 1023570122> executing prepstmnt 1035550395 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params= (String) a,   (String) b]
240999  JpaPersistenceUnit  TRACE  [pool-25-thread-5] openjpa.jdbc.SQL - <t 2107513837, conn 1168031715> executing prepstmnt 1439514282 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]
241000  JpaPersistenceUnit  TRACE  [pool-25-thread-1] openjpa.jdbc.SQL - <t 1881630463, conn 2024928498> executing prepstmnt 1158780577 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]
241000  JpaPersistenceUnit  TRACE  [pool-25-thread-7] openjpa.jdbc.SQL - <t 1202968848, conn 1876787130> executing prepstmnt 1082517334 INSERT INTO TABLE (  COLUMN1, COLUMN2  ) VALUES (?, ?)  [params=  (String) a,   (String) b]



41018  JpaPersistenceUnit  TRACE  [pool-25-thread-4] openjpa.Runtime - An exception occurred while ending the transaction.  This exception will be re-thrown.<openjpa-2.4.0-r422266:1674604 fatal store error> org.apache.openjpa.util.StoreException: The transaction has been rolled back.  See the nested exceptions for details on the errors that occurred.
FailedObject: com.test.SomeClass@19df04ab
 at org.apache.openjpa.kernel.BrokerImpl.newFlushException(BrokerImpl.java:2368)
 at org.apache.openjpa.kernel.BrokerImpl.flush(BrokerImpl.java:2205)
 at org.apache.openjpa.kernel.BrokerImpl.flushSafe(BrokerImpl.java:2103)
 at org.apache.openjpa.kernel.BrokerImpl.beforeCompletion(BrokerImpl.java:2021)
 at org.apache.openjpa.kernel.LocalManagedRuntime.commit(LocalManagedRuntime.java:81)
 at org.apache.openjpa.kernel.BrokerImpl.commit(BrokerImpl.java:1526)
 at org.apache.openjpa.kernel.DelegatingBroker.commit(DelegatingBroker.java:932)
 at org.apache.openjpa.persistence.EntityManagerImpl.commit(EntityManagerImpl.java:569)
 at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:514)
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:755)
 at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:724)
 at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:475)
 at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:270)
 at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:94)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:633)
 at com.test.Dao$$EnhancerBySpringCGLIB$$c4aa5f08.insertObject(<generated>)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
 at org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.doInvoke(ServiceInvoker.java:58)
 at org.springframework.osgi.service.importer.support.internal.aop.ServiceInvoker.invoke(ServiceInvoker.java:62)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
 at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invokeUnprivileged(ServiceTCCLInterceptor.java:56)
 at org.springframework.osgi.service.util.internal.aop.ServiceTCCLInterceptor.invoke(ServiceTCCLInterceptor.java:39)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.osgi.service.importer.support.LocalBundleContextAdvice.invoke(LocalBundleContextAdvice.java:59)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:132)
 at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:120)
 at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
 at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:204)
 at com.sun.proxy.$Proxy255.insertObject(Unknown Source)
 at com.test.OtherClass.lambda$method(OtherClass.java:146)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: <openjpa-2.4.0-r422266:1674604 fatal store error> org.apache.openjpa.util.ObjectExistsException: ORA-00001: unique constraint (TABLE_PK) violated
 {prepstmnt 1119780936 INSERT INTO TABLE (COLUMN1, COLUMN2) VALUES (?, ?) [params=(String) a, (String) b]} [code=1, state=23000]
FailedObject: com.test.entities.Table@19df04ab
 at org.apache.openjpa.jdbc.sql.DBDictionary.narrow(DBDictionary.java:4986)
 at org.apache.openjpa.jdbc.sql.DBDictionary.newStoreException(DBDictionary.java:4961)
 at org.apache.openjpa.jdbc.sql.SQLExceptions.getStore(SQLExceptions.java:133)
 at org.apache.openjpa.jdbc.sql.SQLExceptions.getStore(SQLExceptions.java:75)
 at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushBatch(BatchingPreparedStatementManagerImpl.java:225)
 at org.apache.openjpa.jdbc.kernel.BatchingConstraintUpdateManager.flush(BatchingConstraintUpdateManager.java:63)
 at org.apache.openjpa.jdbc.kernel.AbstractUpdateManager.flush(AbstractUpdateManager.java:104)
 at org.apache.openjpa.jdbc.kernel.AbstractUpdateManager.flush(AbstractUpdateManager.java:77)
 at org.apache.openjpa.jdbc.kernel.JDBCStoreManager.flush(JDBCStoreManager.java:731)
 at org.apache.openjpa.kernel.DelegatingStoreManager.flush(DelegatingStoreManager.java:131)
 ... 43 more
Caused by: org.apache.openjpa.lib.jdbc.ReportingSQLException: ORA-00001: unique constraint (TABLE_PK) violated
 {prepstmnt 1119780936 INSERT INTO TABLE (COLUMN1, COLUMN2) VALUES (?, ?) [params=(String) a, (String) b]} [code=1, state=23000]
 at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.wrap(LoggingConnectionDecorator.java:218)
 at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.wrap(LoggingConnectionDecorator.java:194)
 at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator.access00(LoggingConnectionDecorator.java:58)
 at org.apache.openjpa.lib.jdbc.LoggingConnectionDecorator$LoggingConnection$LoggingPreparedStatement.executeUpdate(LoggingConnectionDecorator.java:1133)
 at org.apache.openjpa.lib.jdbc.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:275)
 at org.apache.openjpa.jdbc.kernel.JDBCStoreManager$CancelPreparedStatement.executeUpdate(JDBCStoreManager.java:1791)
 at org.apache.openjpa.jdbc.kernel.PreparedStatementManagerImpl.executeUpdate(PreparedStatementManagerImpl.java:268)
 at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushSingleRow(BatchingPreparedStatementManagerImpl.java:254)
 at org.apache.openjpa.jdbc.kernel.BatchingPreparedStatementManagerImpl.flushBatch(BatchingPreparedStatementManagerImpl.java:157)
 ... 48 more

我怎样才能避免这种情况?因为我在生产中遇到了很多这样的错误。

UPD 在代码段中添加了新日志。

我的应用程序位于两个服务器(节点)上。每个服务器都连接到数据库。所以,我的测试我们可以乘以 2.

您可以同步您的 doa 对象,这意味着它一次只能 运行 一个线程。

  @Override
  void insert(String a, String b) {
    MyObject object = new MyObject(a, b);
    synchronized (dao) {
      dao.insertObject(object);
    }
  }

类似上面的内容

The main advantage of synchronized keyword is we can resolve data inconsistency problems. But the main disadvantage of synchronized keyword is it increases waiting time of thread and effects performance of the system. Hence, if there is no specific requirement it is never recommended to use synchronized keyword.

使用并发线程中的单个 EntityManager 很可能是个坏主意,正如您可能从日志中看到的那样:最好的情况是一个线程中的异常回滚 EM 的单个事务所有其他线程也尝试使用。最坏的情况是各种并发问题,竞争条件等等。

I want it INSERT just once and if It already in DB, select it. But I have about 5 methods in different classes which can call that INSERT in time, because of that, that exception should be handled

在那种情况下,您可能别无选择,只能通过其他方式同步您的 DAO 调用;可以通过锁定数据库中的某些现有记录或以任何其他方式来完成。您可能还想尝试 EntityManager.merge(),但我认为这不会解决您的两台独立机器同时写入的问题。

我现在用的是memcache,但不是最终的解决方案

 private int expire = 2;

 public <T> void insert(Supplier<T> supplier, String... keyLine) throws InterruptedException, MemcachedException, TimeoutException {
        String key = ParseUtils.collectToKeyWithDot(keyLine);
        T value = getCache(key);
        if (value == null) {
            value = supplier.get();
            setCache(key, value);
        } 
    }

 private <T> Boolean setCache(String key, T value) throws InterruptedException, MemcachedException, TimeoutException {
        return memcacheClient.set(key, expire, value);
    }

    private <T> T getCache(String key) throws InterruptedException, MemcachedException, TimeoutException {
        return memcacheClient.get(key);
    }

在这里,我将我的第一个 INSERT 存储到数据库中,也存储到缓存中 2 秒。在那 2 秒内,如果有任何线程试图将相同的值插入数据库并且不会发生。首先,它将在缓存中进行检查。对我来说,2秒足以避免异常。

P.S.我还在寻找更优雅的解决方案。

您可能需要考虑从数据访问对象延迟写入,像这样 -

class SomeClass {
    private EntityManager em;
    private Dao dao;
    private Set<MyObject> writableObjects = new HashSet<>();

    @Override
    public void insert(String a, String b) {
        MyObject object = new MyObject(a, b);
        writableObjects.add(object);
    }

    @override
    public void commit() {
        writableObjects.forEach(object -> dao.insertObject(object));
    }
}

class OtherClass {
    private final ExecutorService completableFutureExecutor = new ThreadPoolExecutor(10, 11, 30L, TimeUnit.SECONDS,
            new SynchronousQueue<>());

    public void method() {

        Runnable task1 = () -> dao.insert("a", "b");
        for (int i = 0; i < 5; i++) {
            completableFutureExecutor.submit(task1);
        }

        dao.commit();
    }
}