常规和 Spring 基于数据的缓存操作失败,CacheStoppedException post 手动重新连接 clientReconnectDisabled 为真

Regular & Spring Data based cache operations fail with CacheStoppedException post manual reconnect with clientReconnectDisabled as true

我们是 运行 2.4 版并使用 Spring IgniteSpringBean & Spring Data 存储库进行集群和缓存访问。

由于我们遇到了很多与 IgniteClientDisconnectedException 相关的问题,我正在编写一个手动分段解析器(通过禁用自动客户端重新连接并将 clientReconnectDisabled 设置为 true)来检测这种情况(使用定期运行的简单缓存查询) ) 并通过 IgniteSpringBean#close followed by a reconnect with the below code fragment (very similar to this discussion, http://apache-ignite-users.70518.x6.nabble.com/SPI-has-already-been-started-always-create-new-configuration-instance-for-each-starting-Ignite-instar-td7360.html),

发起断开连接

bean DCMIgnite 中的代码片段Spring下面在 XML 配置中引用的 Bean#reconnect():

public final void reconnect(final IgniteConfiguration specifiedIgniteConfiguration) {
  LOGGER.info("Initiating reconnect..");
  try {
    close();
    //destroy();
  } catch (Exception e) {
    LOGGER.warn("Error while disconnecting", e);
  }
  LOGGER.info("Disconnected..");
  try {
    Thread.sleep(1000);
  } catch (Exception e) {
    LOGGER.warn("Error while pausing to reconnect", e);
  }
  setConfiguration(specifiedIgniteConfiguration);
  afterSingletonsInstantiated();
  final CacheConfiguration[] cfgArray = specifiedIgniteConfiguration.getCacheConfiguration();
  LOGGER.info("Cache configuration is : {}", cfgArray);
  getOrCreateCaches(Arrays.asList(cfgArray));
  LOGGER.info("Reconnected..");
}

XML bean 配置片段:

<bean id="igniteInstance" class="com.brocade.dcm.configuration.DCMIgniteSpringBean">
        <property name="configuration" ref="grid.cfg"/>
</bean>
<bean id="grid.cfg.provider" class="com.brocade.dcm.configuration.ClientHealthBasedReconnectWrapper">
        <lookup-method name="createIgniteConfiguration" bean="grid.cfg"/>
</bean>
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration" scope="prototype">
...
...
</bean>

有了上面的内容,我就开始工作了,看到我的扩展 IgniteSpringBean 客户端正确地重新连接并启动了所有缓存。

然而,问题是即使客户端已连接并且缓存已启动,所有后续 calls/queries 到任何 IgniteCache & IgniteRepository 实例都因 CacheStoppedException(如下)而失败并呈现为不可用。

有人可以建议我可以做些什么来刷新这些引用。我知道当客户端自动重新连接时 post 断开连接后,引用继续正常工作,这告诉我有一种方法可以刷新它们并且我没有这样做。

关于如何实现这一目标的任何专家想法...感觉我很接近但考虑到我正在做黑客还很远:-(

下面是我分别调用 IgniteCache#query() 和 IgniteRepository#findByXXX() 得到的异常,

class org.apache.ignite.internal.processors.cache.CacheStoppedException: Failed to perform cache operation (cache is stopped): FabricInfoCache
    at org.apache.ignite.internal.processors.cache.GridCacheGateway.enter(GridCacheGateway.java:164)
    at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.onEnter(GatewayProtectedCacheProxy.java:1684)
    at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.query(GatewayProtectedCacheProxy.java:365)
    at com.brocade.dcm.configuration.ClientHealthBasedReconnectWrapper.monitorHealth(ClientHealthBasedReconnectWrapper.java:110)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

.

[Request processing failed; nested exception is java.lang.IllegalStateException: class org.apache.ignite.internal.processors.cache.CacheStoppedException: Failed to perform cache operation (cache is stopped): WebsocketCacheInfo] with root cause
class org.apache.ignite.internal.processors.cache.CacheStoppedException: Failed to perform cache operation (cache is stopped): WebsocketCacheInfo
    at org.apache.ignite.internal.processors.cache.GridCacheGateway.enter(GridCacheGateway.java:164)
    at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.onEnter(GatewayProtectedCacheProxy.java:1684)
    at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.query(GatewayProtectedCacheProxy.java:365)
    at org.apache.ignite.springdata.repository.query.IgniteRepositoryQuery.execute(IgniteRepositoryQuery.java:117)
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.doInvoke(RepositoryFactorySupport.java:483)
    at org.springframework.data.repository.core.support.RepositoryFactorySupport$QueryExecutorMethodInterceptor.invoke(RepositoryFactorySupport.java:461)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:61)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.data.repository.core.support.SurroundingTransactionDetectorMethodInterceptor.invoke(SurroundingTransactionDetectorMethodInterceptor.java:57)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213)
    at com.sun.proxy.$Proxy182.findByWebsocketSessionId(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
    at com.sun.proxy.$Proxy124.findByWebsocketSessionId(Unknown Source)

谢谢 穆图

我相信这应该在 2.5 中得到修复:

https://issues.apache.org/jira/browse/IGNITE-2766

请试试这个版本。

对于其他面临此问题的人,我通过从源代码构建并修复 GatewayProtectedCacheProxy#checkProxyIsValid & GridCacheContext 中的代码来解决问题。

特别感谢@Michael 分享了相关的 issue,这有助于获得此解决方案。

基本上我看到当 ignite 停止并重新启动时,之前提供的包装缓存代理引用 (4 IgniteCache/IgniteRepository) 的内核上下文随着内核停止并使用新实例重新启动而变得陈旧. (spring) 应用程序具有这些引用(来自各种注入)并且它们的后续调用失败。修复是检查是否有一个现有的 运行 内核 instance/reference 用于相同的 ignite 实例名称,如果有,则更新代理引用,如果具有相同名称的缓存已启动且可用。

private GridCacheGateway<K, V> checkProxyIsValid(@Nullable GridCacheGateway<K, V> gate, boolean tryRestart) {
..
..
  if (isCacheProxy && tryRestart && gate.isStopped() &&
                context().kernalContext().gateway().getState() == GridKernalState.STOPPED) {
            IgniteKernal igniteKernal = (IgniteKernal) Ignition.ignite(context().gridConfig().getIgniteInstanceName());
            if(igniteKernal != null) {
             context().setGridKernalContext(igniteKernal.context());
            }
   }
  if (isCacheProxy && tryRestart && gate.isStopped() &&
            context().kernalContext().gateway().getState() == GridKernalState.STARTED) {
            IgniteCacheProxyImpl proxyImpl = (IgniteCacheProxyImpl) delegate;

            try {
                IgniteInternalCache<K, V> cache = context().kernalContext().cache().<K, V>publicJCache(context().name()).internalProxy();

                GridFutureAdapter<Void> fut = proxyImpl.opportunisticRestart();

                if (fut == null)
                    proxyImpl.onRestarted(cache.context(), cache.context().cache());
                else
                    new IgniteFutureImpl<>(fut).get();

                return gate();
            } catch (IgniteCheckedException ice) {
                // Opportunity didn't work out.
            }
        }

        return gate;
  }

     /**
     * NOTE : This method goes into GridCacheContext.java
     * @param ctx
     */
    public void setGridKernalContext(GridKernalContext ctx) {
        this.ctx = ctx;
    }