apache 点燃本地缓存事件未发送到 java 侦听器 class

apache ignite local cache events not sent to java listener class

我有一个 2 节点的 apache ignite (v2.9.0) 集群,带有一个名为 customer_cache 的分布式缓存。我正在尝试创建一个接收 EventType.EVT_CACHE_OBJECT_PUT 事件的本地事件监听器。我正在关注 ignite 文档 (https://ignite.apache.org/docs/2.9.0/events/listening-to-events),但是当我向缓存添加条目时,本地侦听器从不“触发”。监听器 class 在 2 个点燃节点中的每一个上都是 运行。

这是我的点燃配置XML

<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">

    <property name="peerClassLoadingEnabled" value="true"/>
    <property name="rebalanceThreadPoolSize" value="2"/>
    <property name="metricsLogFrequency" value="#{60 * 10 * 1000}"/>

    <property name="cacheConfiguration">
        <list>
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="name" value="customer_cache"/>
            </bean>
        </list>
    </property>

    <property name="dataStorageConfiguration">
        <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
            <property name="defaultDataRegionConfiguration">
                <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                    <property name="name" value="Default_Region" />
                    <property name="initialSize" value="#{1L * 1024 * 1024 * 1024}" />
                    <property name="maxSize" value="#{6L * 1024 * 1024 * 1024}" />
                    <property name="lazyMemoryAllocation" value="false" />
                    <property name="persistenceEnabled" value="false" />
                </bean>
            </property>
        </bean>
    </property>

    <!-- cache events -->
    <property name="includeEventTypes">
        <list>
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
        </list>
    </property>

    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
    <property name="discoverySpi">
        <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
            <property name="ipFinder">
                <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                    <property name="addresses">
                        <list>
                            <value>99.18.35.13:47500..47509</value>
                            <value>99.18.35.37:47500..47509</value>
                        </list>
                    </property>
                </bean>
            </property>
        </bean>
    </property>
</bean>

这是我的 java 听众 class:

package com.glib.mystuff.ignite;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.lang.IgnitePredicate;

public class LocalCachePutListener extends AbstractIgniteTask {
    
    public static void main(String[] args) {
        new LocalCachePutListener().go();
    }

    private void go() {

        // Start an ignite client using parent class helper method
        Ignite ignite = getIgniteClient();
        
        IgniteEvents events = ignite.events(ignite.cluster().forCacheNodes("customer_cache"));

        // Local listener that listens to local events.
        IgnitePredicate<CacheEvent> localListener = evt -> {
            System.out.println("-----------> Received event name=" + 
                               evt.name() + 
                               ", key=" + 
                               evt.key() + 
                               ", oldVal=" + 
                               evt.oldValue() + 
                               ", newVal=" + 
                               evt.newValue());

             // Continue listening.
             return true; 
         };

         // (for some reason this is needed although it is in the ignite config)
         events.enableLocal(EventType.EVT_CACHE_OBJECT_PUT);
         
         // Subscribe to the cache events that are triggered on the local node.
         events.localListen(localListener, EventType.EVT_CACHE_OBJECT_PUT);
    }
}

然后我有一个简单的 java class,它向 customer_cache 添加了一个条目。我知道条目是通过使用 ignitevisor.sh 实用程序添加的,以通过 cache -a 查看我的缓存的扩展详细信息。据我从 ignite 文档中得知,我拥有侦听本地 EventType.EVT_CACHE_OBJECT_PUT 事件所需的所有组件和配置。我做错了什么???

非常感谢您提供的任何帮助。如果我有头发,我早就拔光了

EVT_CACHE_OBJECT_PUT 事件的本地侦听器将仅在存储密钥的服务器节点上触发。但是,您正试图在客户端上捕获它:

Ignite ignite = getIgniteClient();

也许您可以改用连续查询?

请注意,我使用的是 apache ignite v2.9.0

需要创建一个新的 class 来创建和注册本地 ignite 缓存事件侦听器(参见 https://ignite.apache.org/docs/2.9.0/events/listening-to-events#listening-to-local-events)。

这个新的 class 需要实现 org.apache.ignite.lifecycle.LifecycleBean 以便它可以在启动节点时执行。这还涉及修改 ignite XML 配置文件以指向新的 class(参见 https://ignite.apache.org/docs/2.9.0/starting-nodes)。

最后,新的class需要在每个点燃节点的class路径上。