Apache Ignite 可用性问题 w/Custom CacheStoreAdapter
Apache Ignite Availability Issue w/Custom CacheStoreAdapter
我正在使用 apache ignite 进行 PoC。这是我正在测试的场景:
- 启动一个包含 3 个节点和一个客户端的集群。
- 调用获取密钥。我登录缓存此 key.
的节点
- 调用获取密钥。我验证它获得了储值。
- 执行 loadCache()。所有节点都报告成功加载缓存。
- 杀死最初加载的节点key
- 重新启动我刚刚杀死的节点。
- 再次查询 key。
第 6 步和第 7 步有些问题。如果我在两者之间等待足够长,一切都会正常进行。但是,如果尝试将 6 和 7 靠得太近,那么我会得到 this error on the client and this error on the node。
我看到错误 IgniteClientDisconnectedException: Failed to wait for topology update, client disconnected.
但是有没有办法避免这个问题?设置更长的等待拓扑更新的时间并不是真正的选择,因为客户端可能随时尝试连接。这与我的集群配置有关吗?我看到 this documentation 这表明无限尝试连接,这似乎只会不断出错。
此外,我们需要能够动态 grow/shrink 集群。这可能吗?内存备份会修复功能吗?
注意,如果我省略第 6 步,我没有看到它失败。
集群节点配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--<import resource="./cache.xml"/>-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="peerClassLoadingEnabled" value="true"/>
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="recordData"/>
<!--<property name="rebalanceMode" value="SYNC"/>-->
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="Application.RecordDataStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
</bean>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Override local port. -->
<property name="localPort" value="8000"/>
</bean>
</property>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<!-- Override local port. -->
<property name="localPort" value="8100"/>
</bean>
</property>
</bean>
</beans>
客户端配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>
<property name="clientMode" value="true"/>
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="recordData"/>
<!--<property name="rebalanceMode" value="SYNC"/>-->
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="com.digitaslbi.idiom.util.RecordDataStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
</bean>
</property>
<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>localhost:8000..8099</value>
<!--<value>127.0.0.1:47500..47509</value>-->
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
CacheStoreAdaptor 的实现方法
public class RecordDataStore extends CacheStoreAdapter<Long, List<Record>> {
// This method is called whenever "get(...)" methods are called on IgniteCache.
@Override public List<Record> load(Long key) {
System.out.println("Load data for pel: " + key);
try {
CouchDbConnector db = RecordDataStore.getDb();
ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
List<Record> list = db.queryView(viewQuery,Record.class);
HashMultimap<Long,Record> multimap = HashMultimap.create();
list.forEach(r -> {
multimap.put(r.getId(),r);
});
return new LinkedList<>(multimap.get(key));
} catch (MalformedURLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
....
@Override public void loadCache(IgniteBiInClosure<Long, List<Record>> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null) {
throw new CacheLoaderException("Expected entry count parameter is not provided.");
}
System.out.println("Loading Cache...");
final long entryCnt = (Long)args[0];
try{
CouchDbConnector db = RecordDataStore.getDb();
ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
List<Record> list = db.queryView(viewQuery,Record.class);
HashMultimap<Long,Record> multimap = HashMultimap.create();
long count = 0;
for(Record r : list) {
multimap.put(r.getPel(),r);
count++;
if(count == entryCnt)
break;
}
multimap.keySet().forEach(key -> {
clo.apply(key,new LinkedList<>(multimap.get(key)));
});
}
catch (MalformedURLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
System.out.println("Loaded Cache");
}
public static CouchDbConnector getDb() throws MalformedURLException {
HttpClient httpClient = new StdHttpClient.Builder()
.url("server:1111/")
.build();
CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient);
CouchDbConnector db = new StdCouchDbConnector("ignite", dbInstance);
return db;
}
}
http://apache-ignite-users.70518.x6.nabble.com/Ignite-cluster-recovery-after-network-partition-td2775.html stresses that the IgniteClientDisconnectedException
provides a IgniteFuture
可以通过调用
访问
IgniteFuture f = myException.reconnectFuture();
那个 future 有一个 get()
方法,等待节点重新连接:
Synchronously waits for completion of the computation and returns computation result.
因此,当客户端重新连接后,以下操作应该完成:
f.get();
我正在使用 apache ignite 进行 PoC。这是我正在测试的场景:
- 启动一个包含 3 个节点和一个客户端的集群。
- 调用获取密钥。我登录缓存此 key. 的节点
- 调用获取密钥。我验证它获得了储值。
- 执行 loadCache()。所有节点都报告成功加载缓存。
- 杀死最初加载的节点key
- 重新启动我刚刚杀死的节点。
- 再次查询 key。
第 6 步和第 7 步有些问题。如果我在两者之间等待足够长,一切都会正常进行。但是,如果尝试将 6 和 7 靠得太近,那么我会得到 this error on the client and this error on the node。
我看到错误 IgniteClientDisconnectedException: Failed to wait for topology update, client disconnected.
但是有没有办法避免这个问题?设置更长的等待拓扑更新的时间并不是真正的选择,因为客户端可能随时尝试连接。这与我的集群配置有关吗?我看到 this documentation 这表明无限尝试连接,这似乎只会不断出错。
此外,我们需要能够动态 grow/shrink 集群。这可能吗?内存备份会修复功能吗?
注意,如果我省略第 6 步,我没有看到它失败。
集群节点配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--<import resource="./cache.xml"/>-->
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="peerClassLoadingEnabled" value="true"/>
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="recordData"/>
<!--<property name="rebalanceMode" value="SYNC"/>-->
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="Application.RecordDataStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
</bean>
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<!-- Override local port. -->
<property name="localPort" value="8000"/>
</bean>
</property>
<property name="communicationSpi">
<bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
<!-- Override local port. -->
<property name="localPort" value="8100"/>
</bean>
</property>
</bean>
</beans>
客户端配置
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<!-- Set to true to enable distributed class loading for examples, default is false. -->
<property name="peerClassLoadingEnabled" value="true"/>
<property name="clientMode" value="true"/>
<property name="cacheConfiguration">
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="recordData"/>
<!--<property name="rebalanceMode" value="SYNC"/>-->
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<property name="cacheStoreFactory">
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
<constructor-arg value="com.digitaslbi.idiom.util.RecordDataStore"/>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
</bean>
</property>
<!-- Enable task execution events for examples. -->
<property name="includeEventTypes">
<list>
<!--Task execution events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
<!--Cache events-->
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
</list>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>localhost:8000..8099</value>
<!--<value>127.0.0.1:47500..47509</value>-->
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
CacheStoreAdaptor 的实现方法
public class RecordDataStore extends CacheStoreAdapter<Long, List<Record>> {
// This method is called whenever "get(...)" methods are called on IgniteCache.
@Override public List<Record> load(Long key) {
System.out.println("Load data for pel: " + key);
try {
CouchDbConnector db = RecordDataStore.getDb();
ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
List<Record> list = db.queryView(viewQuery,Record.class);
HashMultimap<Long,Record> multimap = HashMultimap.create();
list.forEach(r -> {
multimap.put(r.getId(),r);
});
return new LinkedList<>(multimap.get(key));
} catch (MalformedURLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
}
....
@Override public void loadCache(IgniteBiInClosure<Long, List<Record>> clo, Object... args) {
if (args == null || args.length == 0 || args[0] == null) {
throw new CacheLoaderException("Expected entry count parameter is not provided.");
}
System.out.println("Loading Cache...");
final long entryCnt = (Long)args[0];
try{
CouchDbConnector db = RecordDataStore.getDb();
ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
List<Record> list = db.queryView(viewQuery,Record.class);
HashMultimap<Long,Record> multimap = HashMultimap.create();
long count = 0;
for(Record r : list) {
multimap.put(r.getPel(),r);
count++;
if(count == entryCnt)
break;
}
multimap.keySet().forEach(key -> {
clo.apply(key,new LinkedList<>(multimap.get(key)));
});
}
catch (MalformedURLException e) {
throw new CacheLoaderException("Failed to load values from cache store.", e);
}
System.out.println("Loaded Cache");
}
public static CouchDbConnector getDb() throws MalformedURLException {
HttpClient httpClient = new StdHttpClient.Builder()
.url("server:1111/")
.build();
CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient);
CouchDbConnector db = new StdCouchDbConnector("ignite", dbInstance);
return db;
}
}
http://apache-ignite-users.70518.x6.nabble.com/Ignite-cluster-recovery-after-network-partition-td2775.html stresses that the IgniteClientDisconnectedException
provides a IgniteFuture
可以通过调用
IgniteFuture f = myException.reconnectFuture();
那个 future 有一个 get()
方法,等待节点重新连接:
Synchronously waits for completion of the computation and returns computation result.
因此,当客户端重新连接后,以下操作应该完成:
f.get();