Ignite - 完全同步配置

Ignite - full sync configuration

我在集群中有两个服务器 ignite 节点(每个节点都在 Spring 引导应用程序中启动)。

我有两个缓存:

//持久化缓存

configuration.setReadThrough(true);
    configuration.setWriteThrough(true);
    configuration.setCacheStoreFactory(storeFactory);
    configuration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
    configuration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
    configuration.setCacheMode(CacheMode.REPLICATED);

//内存中

configuration.setIndexedTypes(String.class, SequenceReserve.class);
    configuration.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
    configuration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
    configuration.setCacheMode(CacheMode.REPLICATED);

对任何缓存的更新请求可以并行到每个节点。

每次更新 - 原子操作。

cache.invoke(...);

我的主要目标是不惜一切代价避免数据不一致。内存缓存可以丢失,但不应该不一致。

如果 t运行saction 没有在所有节点上提交,任何节点都应该 return 异常。

我能否编写这样的配置,以确保此行为有 100% 的概率运行。

已更新

我 运行 测试并得到以下行为:

每个请求总是在同一个节点上执行(调用方法)。我相信这是正确的行为。什么时候在第二个节点上执行查询?

IgniteCache#invoke(...) 是事务性操作。学习它的最好方法是检查它是否抛出 TransactionException。

你的配置似乎足以保证节点间的数据一致性。

如果你的意思是这两个缓存之间的一致性,那么你可以在其中启动显式事务和 运行 invoke-s。

UPD

请注意,正如 JavaDoc 中提到的 invoke(..) method, your EntryProcessor 应该是无状态的。它可能在不同的节点上被多次调用,所以它应该 return 每次都具有相同的值。

UPD 2

如果您在事务缓存上调用 IgniteCache#invoke() 方法,它会在包含此缓存所需分区的每个节点上调用所提供的 EntryProcessor。但是如果缓存是原子的,那么 EntryProcessor 将只在主节点上被调用。

但是你不应该依赖这种行为。它没有在任何地方指定,因此它可能会在未来的版本中发生变化。 Ignite 可以根据需要多次调用 EntryProcessor#process() 以保证数据一致性。

你可以用下面的代码来验证我的话:

public static void main(String[] args) throws IgniteException {
    Ignite ignite = Ignition.start("examples/config/example-ignite.xml");

    IgniteCache<Integer, String> atomicCache = ignite.getOrCreateCache(
        cacheConfiguration("atomic", CacheAtomicityMode.ATOMIC));

    IgniteCache<Integer, String> txCache = ignite.getOrCreateCache(
        cacheConfiguration("transactional", CacheAtomicityMode.TRANSACTIONAL));

    atomicCache.invoke(1, (entry, arguments) -> {
        System.out.println("Atomic invoke");
        return null;
    });

    txCache.invoke(1, (entry, arguments) -> {
        System.out.println("Transactional invoke");
        return null;
    });
}

private static <K, V> CacheConfiguration<K, V> cacheConfiguration(String name, CacheAtomicityMode atomicity) {
    CacheConfiguration<K, V> cacheCfg = new CacheConfiguration<>(name);
    cacheCfg.setAtomicityMode(atomicity);
    cacheCfg.setCacheMode(CacheMode.REPLICATED);
    cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);

    return cacheCfg;
}

"Transactional invoke" 将打印在每个节点上,但 "Atomic invoke" –– 仅在单个节点上打印。