Apache Ignite 意外删除 IgniteSet

Apache Ignite unexpectedly deletes IgniteSet

我遇到的问题是,我的 Ignite 存储库实例在尝试将其保存在地图中或作为 return 值从函数传递后意外关闭了打开的 Ignite 集。

所以我有 Java Spring 应用程序,其中 Ignite 在 Spring 数据(主)和 Spark 应用程序中使用相同的 Ignite 作为数据库(客户端) .在这种情况下,该集合已创建并填充到 Spark 应用程序中,在 Java 应用程序中,我只想访问它并检查 set.contains(element).

第一部分看起来一切正常 - 集合已创建,我可以在日志中看到它的大小是正确的:

def save(host: String, cacheName: String): Unit = {
    val ignite: Ignite = igniteClientNode(host)
    val igniteSetCache: IgniteSet[String] = createIgniteSetCache(ignite, cacheName)
    igniteSetCache.clear()

    instance.fittedUsers.collect().foreach { row =>
      igniteSetCache.add(row.mkString(","))
    }

    logger.debug("Size of IgniteSet: " + igniteSetCache.size()) // DEBUG: Size of IgniteSet: 7910
  }

在 Java 应用程序中,我有相应的 Ignite bean,我尝试访问创建的缓存并将其保存到地图:

private IgniteSet<String> getSetByModelTag(String modelTag) {
    LOGGER.warning("HERE in getSetByModelTag " + openedIgniteSets); // instance wide map
    IgniteSet<String> alreadyOpenedSet = openedIgniteSets.getOrDefault(modelTag, null);

    if (alreadyOpenedSet == null) {
        try (IgniteSet<String> newSet = igniteInstance.set(modelTag, new CollectionConfiguration())) {
            if (newSet != null) {
                alreadyOpenedSet = newSet;
                openedIgniteSets.put(modelTag, alreadyOpenedSet);
                LOGGER.warning("Number of users in opened set for modelTag=`" +
                        modelTag + "` is " + alreadyOpenedSet.size());
                LOGGER.warning("HERE in if " + openedIgniteSets);
            } else {
                throw new IgniteException("`set()` method in Ignite component returned null.");
            }
        } catch (IgniteException e) {
            LOGGER.log(Level.SEVERE, "Ignite exception", e);
            throw e;
        }
    }

    return alreadyOpenedSet;
}

稍后在代码中我使用这个集合来检查它是否包含一些元素:

// in the bean component
private final Ignite igniteInstance;
private final HashMap<String, IgniteSet<String>> openedIgniteSets = new HashMap<>();

...
var setWithFittedUsers = getSetByModelTag(modelTag);

LOGGER.warning("HERE in processModelTag " + openedIgniteSets);
LOGGER.warning("Number of users in setWithFittedUsers is " + setWithFittedUsers.size());
if (setWithFittedUsers.contains(user)) {
    // do something;
}

.contains()行我有这个错误:

Request processing failed; nested exception is java.lang.IllegalStateException: Set has been removed from cache: GridCacheSetImpl [cache=GridDhtAtomicCache [defRes=org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache@16b71d74, lockedEntriesInfo=org.apache.ignite.internal.processors.cache.LockedEntriesInfo@73230dca, near=null, super=GridDhtCacheAdapter [stopping=true, super=GridDistributedCacheAdapter [super=GridCacheAdapter [locMxBean=org.apache.ignite.internal.processors.cache.CacheLocalMetricsMXBeanImpl@59581355, clusterMxBean=org.apache.ignite.internal.processors.cache.CacheClusterMetricsMXBeanImpl@712fa2e8, aff=org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl@f53c3e7, asyncOpsSem=java.util.concurrent.Semaphore@35f2445b[Permits = 500], partPreloadBadVerWarned=false, active=true, name=datastructures_ATOMIC_PARTITIONED_0@default-ds-group#SET_als_on_all_data, size=0]]]], name=als_on_all_data, id=58c36079e71-40ec3dd2-ada4-4e1e-8b0c-15217439ad5d, collocated=false, separated=true, hdrPart=272, setKey=GridCacheSetHeaderKey [name=als_on_all_data], rmvd=true, compute=org.apache.ignite.internal.IgniteComputeImpl@597bcf59]

这在日志中非常令人困惑,我看到在函数 getSetByModelTag() 中检索了 set 并且其大小符合预期。但是在退出函数后,Ignite 说它停止了缓存,当然在此之后我无法检查任何东西并且它的大小变为 0 :(

日志:

// everything looks as expected
2022-01-26 15:35:26,701 [http-nio-8000-exec-7] [ClientComponentImpl] WARN : HERE in getSetByModelTag {}
2022-01-26 15:35:26,745 [http-nio-8000-exec-7] [ClientComponentImpl] WARN : Number of users in opened set for modelTag=`als_on_all_data` is 7910
2022-01-26 15:35:26,747 [http-nio-8000-exec-7] [ClientComponentImpl] WARN : HERE in if {als_on_all_data=GridCacheSetImpl [cache=GridDhtAtomicCache [defRes=org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache@16b71d74, lockedEntriesInfo=org.apache.ignite.internal.processors.cache.LockedEntriesInfo@73230dca, near=null, super=GridDhtCacheAdapter [stopping=false, super=GridDistributedCacheAdapter [super=GridCacheAdapter [locMxBean=org.apache.ignite.internal.processors.cache.CacheLocalMetricsMXBeanImpl@59581355, clusterMxBean=org.apache.ignite.internal.processors.cache.CacheClusterMetricsMXBeanImpl@712fa2e8, aff=org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl@f53c3e7, asyncOpsSem=java.util.concurrent.Semaphore@35f2445b[Permits = 500], partPreloadBadVerWarned=false, active=true, name=datastructures_ATOMIC_PARTITIONED_0@default-ds-group#SET_als_on_all_data, size=7910]]]], name=als_on_all_data, id=58c36079e71-40ec3dd2-ada4-4e1e-8b0c-15217439ad5d, collocated=false, separated=true, hdrPart=272, setKey=GridCacheSetHeaderKey [name=als_on_all_data], rmvd=false, compute=org.apache.ignite.internal.IgniteComputeImpl@597bcf59]}

// now after exiting the function Ignite stops it
2022-01-26 15:35:28,394 [exchange-worker-#66] [org.apache.ignite.logger.java.JavaLogger] INFO : Stopped cache [cacheName=datastructures_ATOMIC_PARTITIONED_0@default-ds-group#SET_als_on_all_data, group=default-ds-group]

// and now its size is 0
2022-01-26 15:35:28,404 [http-nio-8000-exec-7] [ClientComponentImpl] WARN : HERE in processModelTag {als_on_all_data=GridCacheSetImpl [cache=GridDhtAtomicCache [defRes=org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache@16b71d74, lockedEntriesInfo=org.apache.ignite.internal.processors.cache.LockedEntriesInfo@73230dca, near=null, super=GridDhtCacheAdapter [stopping=true, super=GridDistributedCacheAdapter [super=GridCacheAdapter [locMxBean=org.apache.ignite.internal.processors.cache.CacheLocalMetricsMXBeanImpl@59581355, clusterMxBean=org.apache.ignite.internal.processors.cache.CacheClusterMetricsMXBeanImpl@712fa2e8, aff=org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl@f53c3e7, asyncOpsSem=java.util.concurrent.Semaphore@35f2445b[Permits = 500], partPreloadBadVerWarned=false, active=true, name=datastructures_ATOMIC_PARTITIONED_0@default-ds-group#SET_als_on_all_data, size=0]]]], name=als_on_all_data, id=58c36079e71-40ec3dd2-ada4-4e1e-8b0c-15217439ad5d, collocated=false, separated=true, hdrPart=272, setKey=GridCacheSetHeaderKey [name=als_on_all_data], rmvd=true, compute=org.apache.ignite.internal.IgniteComputeImpl@597bcf59]}

我没有 ignite.remove()ignite.destroy() :( 此外,创建和填充缓存的客户端节点也没有被销毁(在 Spark 应用程序中)。主节点正在工作以及(在 Java 应用程序中)。

为什么我在单独的函数中需要这个:

每当应用程序有请求处理时,该装置就会连接。因此,如果 RPS >1000,igniteInstance.set(modelTag, new CollectionConfiguration()) 行偶尔会抛出 NPO(在 30% 的请求中)。所以我决定打开一次set,通过set name key存入map,每次需要用的时候访问。

所以我猜:

请帮忙解决这个问题!

经过几个小时的调试,我终于找到了原因和解决方案。

首先我调试了每次打开set的size。奇怪的是,在第一次调用后它的大小变成了 0,所以在第一次调用 ignite.set() 后 set 被删除了。在此之后我切换到普通缓存(而不是设置)并检查 cache.containsKey(user)。它的大小在 getOrCreateCache() 个调用中持续存在,但 NPO 问题仍然被提出。

然后我发现这个很小的 ​​answer on Ignite mailing list 据说 Ignite 缓存实现了 AutoCloseable 接口。这意味着在 try-except 之后块 cache.close() 被自动调用。这意味着不仅要关闭与缓存的“连接”,还要停止缓存本身。

在此之后我将代码更改为:

IgniteCache<String, String> cache = igniteInstance.getOrCreateCache(configuration);
if (cache != null) {
    if (cache.containsKey(user)) {
        finalModelTag = modelTag;
    }
} else {
    throw new CacheException("`getOrCreateCache()` method in Ignite component returned null.");
}

我还在 Ignite 日志中注意到 partition exchange process (PME) 始终以默认缓存组启动。在 PME 期间,缓存(和集合)被停止。这可能是我参加 NPO 的原因。我开始将缓存放在另一个组中,并且在应用程序工作期间未触发 PME 进程:

val cacheConfiguration = new CacheConfiguration[String, String]()
cacheConfiguration.setBackups(2)
cacheConfiguration.setGroupName("some-group-name")
cacheConfiguration.setName(cacheName)

不知道究竟是什么帮助解决了最初的问题,但现在一切都按预期进行。遗憾的是在缓存创建期间无法捕获异常,我没有弄清楚 - 如何不触发自动 cache.close().