尝试将元素添加到队列时闭包卡在 2.0

Closures stuck in 2.0 when try to add an element into the queue

我们有一个用例如下

1-)启动2个ignite实例作为数据节点,向缓存中插入数据。

2-) 创建队列并使用 remoteListen 注册远程侦听器,如下所示

//Queue creation         
CollectionConfiguration colCfg = new CollectionConfiguration(); 
colCfg.setCacheMode(PARTITIONED); 
IgniteQueue<BinaryObject> queue = Ignition.ignite().queue(queueName, 0, colCfg);



//Remote Listener Closure 
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() { 
                        @Override public boolean apply(CacheEvent evt) { 
                                System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']'); 
                Ignite ignite = Ignition.ignite(); 
                IgniteQueue<String> queue = ignite.queue(queueName, 0, null); 
                String key = evt.key(); 
                BinaryObject profile = (BinaryObject) evt.newValue(); 
                System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() + 
                        ", oldVal=" + evt.oldValue().toString() + ", newVal=" + evt.newValue().toString()); 

                if (profile.<Double>field("usage") > start && profile.<Double>field("usage") < end 
                        && ignite.affinity("profileCache").isPrimary(ignite.cluster().localNode(), key)){ 
                    queue.add(profile.field("number")); 
                } 
                                return false; 
                        } 
                };       

Ignition.ignite().events(ignite.cluster().forCacheNodes("profileCache")).remoteListen(1,1l,false,null, rmtLsnr, 
                                EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED); 

3-)如下所示在缓存实例中进行一些更新,以将更新更新到 remotelistener 中。

void updateAnyProfile(Double newUsage){         
SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile where usage < 30 limit 10"); 
    List<CacheEntryImpl<String, profile>> res = profileCache.query(qry).getAll(); 
    Profile profile = res.iterator().next().getValue(); 
    profile.setUsage(newUsage); 
    profileCache.put(profile.getCtn(), profile); 
    profile.setUsage(newUsage+1); 
    profileCache.put(profile.getCtn(), profile); 

} 

4-) 从队列中取出元素。

 public void readFromQueue (String queueName) { 
    // Initialize new FIFO queue. 
    IgniteQueue<String> queue = Ignition.ignite().queue(queueName, 0, null); 
    while (true) { 
        String profile = queue.take(); 
        System.out.println("Profile from queue: " + profile.toString()); 
    } 
} 

第 2、3、4 步 运行 来自客户端节点为 TRUE 的不同 JVM 实例。问题是应用程序在执行上述场景后挂起以执行任何操作。你能帮助我们吗?如果您能告诉我们我们做错了什么,我们将不胜感激?

下面是挂起的数据节点的线程转储,同一个数据节点在下面的代码处挂起

 IgniteQueue<String> queue = ignite.queue(queueName, 0, null);

有时您可以成功更新记录,但在下一次更新后它开始挂起或者甚至无法在缓存中进行放置操作。

"sys-stripe-5-#6%null%" #25 prio=5 os_prio=31 tid=0x00007fd88d031800 nid=0x14c07 waiting on condition [0x00007000036e7000] 
   java.lang.Thread.State: WAITING (parking) 
        at sun.misc.Unsafe.park(Native Method) 
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304) 
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:176) 
        at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:139) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4482) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4463) 
        at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1405) 
        at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue0(CacheDataStructuresManager.java:270) 
        at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue(CacheDataStructuresManager.java:231) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.applyx(DataStructuresProcessor.java:952) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.applyx(DataStructuresProcessor.java:950) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.getCollection(DataStructuresProcessor.java:1078) 
        at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.queue(DataStructuresProcessor.java:950) 
        at org.apache.ignite.internal.IgniteKernal.queue(IgniteKernal.java:3560) 
        at com.ignite.trial.roaming.ProfileService.apply(ProfileService.java:303) 
        at com.ignite.trial.roaming.ProfileService.apply(ProfileService.java:297) 
        at org.apache.ignite.internal.GridEventConsumeHandler.onEvent(GridEventConsumeHandler.java:170) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager$LocalListenerWrapper.onEvent(GridEventStorageManager.java:1311) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.notifyListeners(GridEventStorageManager.java:892) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record0(GridEventStorageManager.java:340) 
        at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record(GridEventStorageManager.java:297) 
        at org.apache.ignite.internal.processors.cache.GridCacheEventManager.addEvent(GridCacheEventManager.java:297) 
        at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1806) 
        - locked <0x00000007b6d01f10> (a org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2386) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1792) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1630) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3016) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access0(GridDhtAtomicCache.java:127) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.apply(GridDhtAtomicCache.java:282) 
        at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.apply(GridDhtAtomicCache.java:277) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:863) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:386) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:308) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access[=14=]0(GridCacheIoManager.java:100) 
        at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage(GridCacheIoManager.java:253) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1257) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:885) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.access00(GridIoManager.java:114) 
        at org.apache.ignite.internal.managers.communication.GridIoManager.run(GridIoManager.java:802) 
        at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:483) 
        at java.lang.Thread.run(Thread.java:748) 

不允许在EventListener中调用ignite.queue和ignite.affinity方法,因为这可能会导致死锁。

包括EventListener在内的所有缓存操作都在系统池中执行,因此不建议在EventListener内部调用也使用系统池的操作。

您可以在 "Closures Execution and Thread Pools" 上阅读更多内容: https://apacheignite.readme.io/docs/async-support#section-listeners-and-chaining-futures

这里https://apacheignite.readme.io/docs/thread-pools#section-system-pool