如何隔离 Apache Ignite 中的事务

How do I isolate transactions in Apache Ignite

我正在尝试使用事务来同步对存储在 Ignite 中的对象的访问,并发现一个事务的结果通常会覆盖另一个事务的结果。为了便于测试,我已经编写了一个更简单的版本作为 JUnit 测试:

import junit.framework.TestCase;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;

public class IgniteTransactionTest extends TestCase {
    private Ignite ignite;

    @Override
    protected void setUp() throws Exception {
        // Start up a non-client Ignite for tests
        IgniteConfiguration config = new IgniteConfiguration();
        ignite = Ignition.getOrStart(config);
    }

    public void testTransaction() throws InterruptedException {
        IgniteCache cache = ignite.getOrCreateCache("cache");
        cache.put("counter", 0);

        Runnable r = () -> {
            for (int i = 0; i < 1000; i++) {
                Transaction tx = ignite.transactions().txStart(
                        TransactionConcurrency.PESSIMISTIC, TransactionIsolation.SERIALIZABLE);

                int counter = (int) cache.get("counter");
                counter += 1;
                cache.put("counter", counter);

                try {
                    tx.commit();
                } catch (Exception ex) {
                    System.out.println("Commit failed");
                    i--;
                }
            }
        };

        Thread t1 = new Thread(r);
        Thread t2 = new Thread(r);

        t1.start();
        t2.start();

        t1.join();
        t2.join();

        assertEquals((int) cache.get("counter"), 2000);
    }

    @Override
    protected void tearDown() throws Exception {
        ignite.close();
    }
}

基本上,它运行两个独立的线程来尝试递增存储在 Ignite 缓存中的计数器,隔离级别为 SERIALIZABLE,然后检查计数器是否具有正确的值。根据 Ignite 交易的 documentation

TransactionIsolation.SERIALIZABLE isolation level means that all transactions occur in a completely isolated fashion, as if all transactions in the system had executed serially, one after the other. Read access with this level happens the same way as with TransactionIsolation.REPEATABLE_READ level. However, in TransactionConcurrency.OPTIMISTIC mode, if some transactions cannot be serially isolated from each other, then one winner will be picked and the other transactions in conflict will result in TransactionOptimisticException being thrown.

但是运行这个测试产生了

junit.framework.AssertionFailedError: expected:<1613> but was:<2000>

表示一个线程中的写入可以在另一个线程中的读取和写入之间交错。此外,文档建议失败的事务在尝试提交时应该抛出异常,但这从未发生过。

我是不是一开始就想念如何使用事务?我如何让他们隔离?

您在没有提供 CacheConfiguration 的情况下创建缓存。默认情况下,会创建 Atomic 缓存,并且此缓存不支持任何事务功能。

你需要这样的东西:

ignite.getOrCreateCache(new CacheConfiguration<>("cache")
    .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL));