基于Spark RDD元素查询Ignite Cache
Query Ignite Cache based on Spark RDD elements
我正在尝试检索 JavaPairRDD 中每个元素的缓存值。我正在使用 LOCAL
缓存模式,因为我想尽量减少缓存数据的数据混洗。点燃节点在火花作业中以嵌入式模式启动。如果我 运行 它在单个节点上,则以下代码可以正常工作。但是,当我 运行 它在 5 台机器的集群上时,我得到 zero
结果。
我的第一次尝试是使用 IgniteRDD sql 方法:
dataRDD.sql("SELECT v.id,v.sub,v.obj FROM VPRow v JOIN table(id bigint = ?) i ON v.id = i.id",new Object[] {objKeyEntries.toArray()});
其中 objKeyEntries
是 RDD 中收集的一组条目。第二次尝试使用 AffinityRun:
JavaPairRDD<Long, VPRow> objEntries = objKeyEntries.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Long, Boolean>>, Long, VPRow>() {
@Override
public Iterator<Tuple2<Long, VPRow>> call(Iterator<Tuple2<Long, Boolean>> tuple2Iterator) throws Exception {
ApplicationContext ctx = new ClassPathXmlApplicationContext("ignite-rdd.xml");
IgniteConfiguration igniteConfiguration = (IgniteConfiguration) ctx.getBean("ignite.cfg");
Ignite ignite = Ignition.getOrStart(igniteConfiguration);
IgniteCache<Long, VPRow> cache = ignite.getOrCreateCache("dataRDD");
ArrayList<Tuple2<Long,VPRow>> lst = new ArrayList<>();
while(tuple2Iterator.hasNext()) {
Tuple2<Long, Boolean> val = tuple2Iterator.next();
ignite.compute().affinityRun("dataRDD", val._1(),()->{
lst.add(new Tuple2<>(val._1(),cache.get(val._1())));
});
}
return lst.iterator();
}
});
以下为ignite-rdd.xml配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="memoryConfiguration">
<bean class="org.apache.ignite.configuration.MemoryConfiguration">
<property name="systemCacheInitialSize" value="#{100 * 1024 * 1024}"/>
<property name="defaultMemoryPolicyName" value="default_mem_plc"/>
<property name="memoryPolicies">
<list>
<bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration">
<property name="name" value="default_mem_plc"/>
<property name="initialSize" value="#{5 * 1024 * 1024 * 1024}"/>
</bean>
</list>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="dataRDD"/>
<!-- Set a cache mode. -->
<property name="cacheMode" value="LOCAL"/>
<!-- Index Integer pairs used in the example. -->
<property name="indexedTypes">
<list>
<value>java.lang.Long</value>
<value>edu.code.VPRow</value>
</list>
</property>
<property name="affinity">
<bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
<property name="partitions" value="50"/>
</bean>
</property>
</bean>
</list>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<value>[IP5]</value>
<value>[IP4]</value>
<value>[IP3]</value>
<value>[IP2]</value>
<value>[IP1]</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
您确定需要使用本地缓存模式吗?
很可能你只在一个节点上填充了缓存,而其他节点上的本地缓存仍然是空的。
affinityRun 不起作用,因为您有本地缓存,而不是分区缓存,因此,无法使用 AffinityFunction 确定密钥的所有者节点。
我正在尝试检索 JavaPairRDD 中每个元素的缓存值。我正在使用 LOCAL
缓存模式,因为我想尽量减少缓存数据的数据混洗。点燃节点在火花作业中以嵌入式模式启动。如果我 运行 它在单个节点上,则以下代码可以正常工作。但是,当我 运行 它在 5 台机器的集群上时,我得到 zero
结果。
我的第一次尝试是使用 IgniteRDD sql 方法:
dataRDD.sql("SELECT v.id,v.sub,v.obj FROM VPRow v JOIN table(id bigint = ?) i ON v.id = i.id",new Object[] {objKeyEntries.toArray()});
其中 objKeyEntries
是 RDD 中收集的一组条目。第二次尝试使用 AffinityRun:
JavaPairRDD<Long, VPRow> objEntries = objKeyEntries.mapPartitionsToPair(new PairFlatMapFunction<Iterator<Tuple2<Long, Boolean>>, Long, VPRow>() {
@Override
public Iterator<Tuple2<Long, VPRow>> call(Iterator<Tuple2<Long, Boolean>> tuple2Iterator) throws Exception {
ApplicationContext ctx = new ClassPathXmlApplicationContext("ignite-rdd.xml");
IgniteConfiguration igniteConfiguration = (IgniteConfiguration) ctx.getBean("ignite.cfg");
Ignite ignite = Ignition.getOrStart(igniteConfiguration);
IgniteCache<Long, VPRow> cache = ignite.getOrCreateCache("dataRDD");
ArrayList<Tuple2<Long,VPRow>> lst = new ArrayList<>();
while(tuple2Iterator.hasNext()) {
Tuple2<Long, Boolean> val = tuple2Iterator.next();
ignite.compute().affinityRun("dataRDD", val._1(),()->{
lst.add(new Tuple2<>(val._1(),cache.get(val._1())));
});
}
return lst.iterator();
}
});
以下为ignite-rdd.xml配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="memoryConfiguration">
<bean class="org.apache.ignite.configuration.MemoryConfiguration">
<property name="systemCacheInitialSize" value="#{100 * 1024 * 1024}"/>
<property name="defaultMemoryPolicyName" value="default_mem_plc"/>
<property name="memoryPolicies">
<list>
<bean class="org.apache.ignite.configuration.MemoryPolicyConfiguration">
<property name="name" value="default_mem_plc"/>
<property name="initialSize" value="#{5 * 1024 * 1024 * 1024}"/>
</bean>
</list>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="dataRDD"/>
<!-- Set a cache mode. -->
<property name="cacheMode" value="LOCAL"/>
<!-- Index Integer pairs used in the example. -->
<property name="indexedTypes">
<list>
<value>java.lang.Long</value>
<value>edu.code.VPRow</value>
</list>
</property>
<property name="affinity">
<bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
<property name="partitions" value="50"/>
</bean>
</property>
</bean>
</list>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<value>[IP5]</value>
<value>[IP4]</value>
<value>[IP3]</value>
<value>[IP2]</value>
<value>[IP1]</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
您确定需要使用本地缓存模式吗?
很可能你只在一个节点上填充了缓存,而其他节点上的本地缓存仍然是空的。
affinityRun 不起作用,因为您有本地缓存,而不是分区缓存,因此,无法使用 AffinityFunction 确定密钥的所有者节点。