尝试将元素添加到队列时闭包卡在 2.0
Closures stuck in 2.0 when try to add an element into the queue
我们有一个用例如下
1-)启动2个ignite实例作为数据节点,向缓存中插入数据。
2-) 创建队列并使用 remoteListen 注册远程侦听器,如下所示
//Queue creation
CollectionConfiguration colCfg = new CollectionConfiguration();
colCfg.setCacheMode(PARTITIONED);
IgniteQueue<BinaryObject> queue = Ignition.ignite().queue(queueName, 0, colCfg);
//Remote Listener Closure
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
Ignite ignite = Ignition.ignite();
IgniteQueue<String> queue = ignite.queue(queueName, 0, null);
String key = evt.key();
BinaryObject profile = (BinaryObject) evt.newValue();
System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
", oldVal=" + evt.oldValue().toString() + ", newVal=" + evt.newValue().toString());
if (profile.<Double>field("usage") > start && profile.<Double>field("usage") < end
&& ignite.affinity("profileCache").isPrimary(ignite.cluster().localNode(), key)){
queue.add(profile.field("number"));
}
return false;
}
};
Ignition.ignite().events(ignite.cluster().forCacheNodes("profileCache")).remoteListen(1,1l,false,null, rmtLsnr,
EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
3-)如下所示在缓存实例中进行一些更新,以将更新更新到 remotelistener 中。
void updateAnyProfile(Double newUsage){
SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile where usage < 30 limit 10");
List<CacheEntryImpl<String, profile>> res = profileCache.query(qry).getAll();
Profile profile = res.iterator().next().getValue();
profile.setUsage(newUsage);
profileCache.put(profile.getCtn(), profile);
profile.setUsage(newUsage+1);
profileCache.put(profile.getCtn(), profile);
}
4-) 从队列中取出元素。
public void readFromQueue (String queueName) {
// Initialize new FIFO queue.
IgniteQueue<String> queue = Ignition.ignite().queue(queueName, 0, null);
while (true) {
String profile = queue.take();
System.out.println("Profile from queue: " + profile.toString());
}
}
第 2、3、4 步 运行 来自客户端节点为 TRUE 的不同 JVM 实例。问题是应用程序在执行上述场景后挂起以执行任何操作。你能帮助我们吗?如果您能告诉我们我们做错了什么,我们将不胜感激?
下面是挂起的数据节点的线程转储,同一个数据节点在下面的代码处挂起
IgniteQueue<String> queue = ignite.queue(queueName, 0, null);
有时您可以成功更新记录,但在下一次更新后它开始挂起或者甚至无法在缓存中进行放置操作。
"sys-stripe-5-#6%null%" #25 prio=5 os_prio=31 tid=0x00007fd88d031800 nid=0x14c07 waiting on condition [0x00007000036e7000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:176)
at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:139)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4482)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4463)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1405)
at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue0(CacheDataStructuresManager.java:270)
at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue(CacheDataStructuresManager.java:231)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.applyx(DataStructuresProcessor.java:952)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.applyx(DataStructuresProcessor.java:950)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.getCollection(DataStructuresProcessor.java:1078)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.queue(DataStructuresProcessor.java:950)
at org.apache.ignite.internal.IgniteKernal.queue(IgniteKernal.java:3560)
at com.ignite.trial.roaming.ProfileService.apply(ProfileService.java:303)
at com.ignite.trial.roaming.ProfileService.apply(ProfileService.java:297)
at org.apache.ignite.internal.GridEventConsumeHandler.onEvent(GridEventConsumeHandler.java:170)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager$LocalListenerWrapper.onEvent(GridEventStorageManager.java:1311)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.notifyListeners(GridEventStorageManager.java:892)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record0(GridEventStorageManager.java:340)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record(GridEventStorageManager.java:297)
at org.apache.ignite.internal.processors.cache.GridCacheEventManager.addEvent(GridCacheEventManager.java:297)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1806)
- locked <0x00000007b6d01f10> (a org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2386)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1792)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1630)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3016)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access0(GridDhtAtomicCache.java:127)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.apply(GridDhtAtomicCache.java:282)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.apply(GridDhtAtomicCache.java:277)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:863)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:386)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:308)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access[=14=]0(GridCacheIoManager.java:100)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage(GridCacheIoManager.java:253)
at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1257)
at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:885)
at org.apache.ignite.internal.managers.communication.GridIoManager.access00(GridIoManager.java:114)
at org.apache.ignite.internal.managers.communication.GridIoManager.run(GridIoManager.java:802)
at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:483)
at java.lang.Thread.run(Thread.java:748)
不允许在EventListener中调用ignite.queue和ignite.affinity方法,因为这可能会导致死锁。
包括EventListener在内的所有缓存操作都在系统池中执行,因此不建议在EventListener内部调用也使用系统池的操作。
您可以在 "Closures Execution and Thread Pools" 上阅读更多内容:
https://apacheignite.readme.io/docs/async-support#section-listeners-and-chaining-futures
这里https://apacheignite.readme.io/docs/thread-pools#section-system-pool
我们有一个用例如下
1-)启动2个ignite实例作为数据节点,向缓存中插入数据。
2-) 创建队列并使用 remoteListen 注册远程侦听器,如下所示
//Queue creation
CollectionConfiguration colCfg = new CollectionConfiguration();
colCfg.setCacheMode(PARTITIONED);
IgniteQueue<BinaryObject> queue = Ignition.ignite().queue(queueName, 0, colCfg);
//Remote Listener Closure
IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
@Override public boolean apply(CacheEvent evt) {
System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
Ignite ignite = Ignition.ignite();
IgniteQueue<String> queue = ignite.queue(queueName, 0, null);
String key = evt.key();
BinaryObject profile = (BinaryObject) evt.newValue();
System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
", oldVal=" + evt.oldValue().toString() + ", newVal=" + evt.newValue().toString());
if (profile.<Double>field("usage") > start && profile.<Double>field("usage") < end
&& ignite.affinity("profileCache").isPrimary(ignite.cluster().localNode(), key)){
queue.add(profile.field("number"));
}
return false;
}
};
Ignition.ignite().events(ignite.cluster().forCacheNodes("profileCache")).remoteListen(1,1l,false,null, rmtLsnr,
EVT_CACHE_OBJECT_PUT, EVT_CACHE_OBJECT_REMOVED);
3-)如下所示在缓存实例中进行一些更新,以将更新更新到 remotelistener 中。
void updateAnyProfile(Double newUsage){
SqlQuery qry = new SqlQuery(Profile.class,"select * from Profile where usage < 30 limit 10");
List<CacheEntryImpl<String, profile>> res = profileCache.query(qry).getAll();
Profile profile = res.iterator().next().getValue();
profile.setUsage(newUsage);
profileCache.put(profile.getCtn(), profile);
profile.setUsage(newUsage+1);
profileCache.put(profile.getCtn(), profile);
}
4-) 从队列中取出元素。
public void readFromQueue (String queueName) {
// Initialize new FIFO queue.
IgniteQueue<String> queue = Ignition.ignite().queue(queueName, 0, null);
while (true) {
String profile = queue.take();
System.out.println("Profile from queue: " + profile.toString());
}
}
第 2、3、4 步 运行 来自客户端节点为 TRUE 的不同 JVM 实例。问题是应用程序在执行上述场景后挂起以执行任何操作。你能帮助我们吗?如果您能告诉我们我们做错了什么,我们将不胜感激?
下面是挂起的数据节点的线程转储,同一个数据节点在下面的代码处挂起
IgniteQueue<String> queue = ignite.queue(queueName, 0, null);
有时您可以成功更新记录,但在下一次更新后它开始挂起或者甚至无法在缓存中进行放置操作。
"sys-stripe-5-#6%null%" #25 prio=5 os_prio=31 tid=0x00007fd88d031800 nid=0x14c07 waiting on condition [0x00007000036e7000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
at org.apache.ignite.internal.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:176)
at org.apache.ignite.internal.util.future.GridFutureAdapter.get(GridFutureAdapter.java:139)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get0(GridCacheAdapter.java:4482)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:4463)
at org.apache.ignite.internal.processors.cache.GridCacheAdapter.get(GridCacheAdapter.java:1405)
at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue0(CacheDataStructuresManager.java:270)
at org.apache.ignite.internal.processors.cache.datastructures.CacheDataStructuresManager.queue(CacheDataStructuresManager.java:231)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.applyx(DataStructuresProcessor.java:952)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.applyx(DataStructuresProcessor.java:950)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.getCollection(DataStructuresProcessor.java:1078)
at org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor.queue(DataStructuresProcessor.java:950)
at org.apache.ignite.internal.IgniteKernal.queue(IgniteKernal.java:3560)
at com.ignite.trial.roaming.ProfileService.apply(ProfileService.java:303)
at com.ignite.trial.roaming.ProfileService.apply(ProfileService.java:297)
at org.apache.ignite.internal.GridEventConsumeHandler.onEvent(GridEventConsumeHandler.java:170)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager$LocalListenerWrapper.onEvent(GridEventStorageManager.java:1311)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.notifyListeners(GridEventStorageManager.java:892)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record0(GridEventStorageManager.java:340)
at org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager.record(GridEventStorageManager.java:297)
at org.apache.ignite.internal.processors.cache.GridCacheEventManager.addEvent(GridCacheEventManager.java:297)
at org.apache.ignite.internal.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1806)
- locked <0x00000007b6d01f10> (a org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCacheEntry)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2386)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1792)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1630)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3016)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access0(GridDhtAtomicCache.java:127)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.apply(GridDhtAtomicCache.java:282)
at org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.apply(GridDhtAtomicCache.java:277)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:863)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:386)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:308)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.access[=14=]0(GridCacheIoManager.java:100)
at org.apache.ignite.internal.processors.cache.GridCacheIoManager.onMessage(GridCacheIoManager.java:253)
at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1257)
at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:885)
at org.apache.ignite.internal.managers.communication.GridIoManager.access00(GridIoManager.java:114)
at org.apache.ignite.internal.managers.communication.GridIoManager.run(GridIoManager.java:802)
at org.apache.ignite.internal.util.StripedExecutor$Stripe.run(StripedExecutor.java:483)
at java.lang.Thread.run(Thread.java:748)
不允许在EventListener中调用ignite.queue和ignite.affinity方法,因为这可能会导致死锁。
包括EventListener在内的所有缓存操作都在系统池中执行,因此不建议在EventListener内部调用也使用系统池的操作。
您可以在 "Closures Execution and Thread Pools" 上阅读更多内容: https://apacheignite.readme.io/docs/async-support#section-listeners-and-chaining-futures
这里https://apacheignite.readme.io/docs/thread-pools#section-system-pool