Apache Ignite 事件侦听器注册错误

Apache Ignite event listener registration error

在我们的生产集群上,我们发现有些事件没有被插入或更新。应用程序没有报告错误。

为了对此进行调查,我想监听来自 Ignite 集群的缓存放置事件。我按照这里提到的 https://ignite.apache.org/docs/latest/events/listening-to-events. Also took reference from this example here. https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEventsExample.java

我尝试了 运行 与示例 link 中提到的完全相同的代码。如下图

try (Ignite ignite = Ignition.start("C:\Users\example\ignite-client-config-local.xml")) {
        System.out.println();
        System.out.println(">>> Cache events example started.");

        EntityKey entityKey = new EntityKey("T123");

        Entity entity = new Entity(entityKey);
        entity.setProductType("prodct1");
        entity.setSubProductType("subproduct1");

        try (IgniteCache<EntityKey,Entity> cache = ignite.getOrCreateCache("ProductsCache")) {

            IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
                @Override public boolean apply(UUID uuid, CacheEvent evt) {
                    System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
                            ", oldVal=" + evt.oldValue() + ", newVal=" + evt.newValue());

                    return true; // Continue listening.
                }
            };


            IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
                @Override public boolean apply(CacheEvent evt) {
                    System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
                    ignite.affinity("ProductsCache").isPrimary(ignite.cluster().localNode(), key);
                    return true;
                }
            };

           
            ignite.events(ignite.cluster().forCacheNodes("ProductsCache")).remoteListen(locLsnr, rmtLsnr,
                    EVT_CACHE_OBJECT_PUT);

            // Generate cache events.
            cache.put(entityKey,entity);

            // Wait for a while while callback is notified about remaining puts.
            Thread.sleep(2000);
            System.out.println(cache.get(entityKey));
        }
        }

但是侦听器功能不起作用。数据被正确地推送到缓存并被读回。这是我首先在本地机器上尝试的。我还在服务器和客户端配置 xml 中将 peerClassLoadingEnabled 设置为 true。同样在这两个配置中,我都添加了 CACHE_EVT_OBJECT_PUT。在 Ignite 服务器上,我可以看到以下错误。

[16:56:07,469][SEVERE][sys-#109%personal.local%][GridContinuousProcessor] Failed to unmarshal continuous routine handler [routineId=1c49245f-238c-49a0-ae48-c4da3085052a, srcNodeId=89b3117f-43aa-428d-a5cf-647a518d089e]
class org.apache.ignite.internal.IgniteDeploymentCheckedException: Failed to obtain deployment for class: com.igniteadmin.LocalIgniteEventFilter
    at org.apache.ignite.internal.GridEventConsumeHandler.p2pUnmarshal(GridEventConsumeHandler.java:418)
    at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.lambda$null(GridContinuousProcessor.java:707)
    at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:7133)
    at org.apache.ignite.internal.processors.closure.GridClosureProcessor.body(GridClosureProcessor.java:827)
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
[16:56:07,481][SEVERE][sys-#108%personal.local%][] JVM will be halted immediately due to the failure: [failureCtx=FailureContext [type=CRITICAL_ERROR, err=class o.a.i.i.IgniteDeploymentCheckedException: Failed to obtain deployment for class: com.igniteadmin.LocalIgniteEventFilter]]

尝试通过 @IgniteInstanceResource 注入 ignite 实例,以便在您的远程过滤器中使用它。

IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
    @IgniteInstanceResource
    private IgniteEx ignite;                

    @Override public boolean apply(CacheEvent evt) {
        System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
        ignite.affinity("ProductsCache").isPrimary(ignite.cluster().localNode(), key);
        return true;
    }
};