IgniteRDD 数据框中没有 return 行
IgniteRDD doesn't return rows in the dataframe
我正在对 IgniteRDD 执行 sql 查询。 RDD 转换和操作 运行 没问题,但是当我在 IgniteRDD 上调用 sql 时,结果不会 return 任何结果。这是我写的确切代码
private val conf = new SparkConf()
.setAppName("IgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2")
/** Spark context */
val sparkContext = new SparkContext(conf)
/** Defines spring cache Configuration path */
private val CONFIG = "examples/config/example-shared-rdd.xml"
/** Creates Ignite context with above configuration configuration */
val igniteContext = new IgniteContext(sparkContext, CONFIG, false)
/** Creates an Ignite RDD of Type (Int,Int) Integer Pair */
val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
/** Fill IgniteRDD with Int pairs */
sharedRDD.savePairs(sparkContext.parallelize(1 to 1000, 10).map(i => (i, i)))
/** Transforming Pairs to contain their Squared value */
sharedRDD.mapValues(x => (x * x))
/** Retrieve sharedRDD back from the Cache */
val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
transformedValues.take(5).foreach(println)
/** Performing SQL query on existing cache and
* collect result into a Spark Dataframe
* */
val rs = transformedValues.sql("select * from Integer where number > ? and number < ? ", 10, 100)
/** Show DataFrame results */
println("The count is::::::::::: "+rs.count)
此处对转换后的值执行 .take(5) 操作是 returning 结果并打印出来。但是当我在上面使用 运行 sql 方法时,它 returns 0 作为行数。我不知道。请帮助。
下面是我的缓存配置设置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<!-- SharedRDD cache example configuration (Atomic mode). -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="sharedRDD"/>
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<!-- set atomicity -->
<property name="atomicityMode" value="ATOMIC"/>
<!-- Number of backup nodes. -->
<property name="backups" value="1"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<!-- Setting indexed type's key class -->
<property name="keyType" value="Integer"></property>
<!-- Setting indexed type's value class -->
<property name="valueType" value="Integer"></property>
<property name="fields">
<map>
<entry key="number" value="Integer"/>
</map>
</property>
<!--Enable Index on the field -->
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="number"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Integer
class 中没有 number
字段,所以从 SQL 的角度来看总是 null
。当使用基元作为键 and/or 值时,您可以使用预定义的 _key
和 _val
字段。所以查询可以是这样的:
select * from Integer where _val > ? and _val < ?
在这种情况下也没有太多理由使用 queryEntities
。 setIndexedTypes
就够了,而且简单多了。
我正在对 IgniteRDD 执行 sql 查询。 RDD 转换和操作 运行 没问题,但是当我在 IgniteRDD 上调用 sql 时,结果不会 return 任何结果。这是我写的确切代码
private val conf = new SparkConf()
.setAppName("IgniteRDDExample")
.setMaster("local")
.set("spark.executor.instances", "2")
/** Spark context */
val sparkContext = new SparkContext(conf)
/** Defines spring cache Configuration path */
private val CONFIG = "examples/config/example-shared-rdd.xml"
/** Creates Ignite context with above configuration configuration */
val igniteContext = new IgniteContext(sparkContext, CONFIG, false)
/** Creates an Ignite RDD of Type (Int,Int) Integer Pair */
val sharedRDD: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
/** Fill IgniteRDD with Int pairs */
sharedRDD.savePairs(sparkContext.parallelize(1 to 1000, 10).map(i => (i, i)))
/** Transforming Pairs to contain their Squared value */
sharedRDD.mapValues(x => (x * x))
/** Retrieve sharedRDD back from the Cache */
val transformedValues: IgniteRDD[Int, Int] = igniteContext.fromCache[Int, Int]("sharedRDD")
transformedValues.take(5).foreach(println)
/** Performing SQL query on existing cache and
* collect result into a Spark Dataframe
* */
val rs = transformedValues.sql("select * from Integer where number > ? and number < ? ", 10, 100)
/** Show DataFrame results */
println("The count is::::::::::: "+rs.count)
此处对转换后的值执行 .take(5) 操作是 returning 结果并打印出来。但是当我在上面使用 运行 sql 方法时,它 returns 0 作为行数。我不知道。请帮助。
下面是我的缓存配置设置:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="cacheConfiguration">
<!-- SharedRDD cache example configuration (Atomic mode). -->
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set a cache name. -->
<property name="name" value="sharedRDD"/>
<!-- Set cache mode. -->
<property name="cacheMode" value="PARTITIONED"/>
<!-- set atomicity -->
<property name="atomicityMode" value="ATOMIC"/>
<!-- Number of backup nodes. -->
<property name="backups" value="1"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<!-- Setting indexed type's key class -->
<property name="keyType" value="Integer"></property>
<!-- Setting indexed type's value class -->
<property name="valueType" value="Integer"></property>
<property name="fields">
<map>
<entry key="number" value="Integer"/>
</map>
</property>
<!--Enable Index on the field -->
<property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<constructor-arg value="number"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</property>
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<!--
Ignite provides several options for automatic discovery that can be used
instead os static IP based discovery. For information on all options refer
to our documentation: http://apacheignite.readme.io/docs/cluster-config
-->
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
<!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<property name="addresses">
<list>
<!-- In distributed environment, replace with actual host IP address. -->
<value>127.0.0.1:47500..47509</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
</bean>
</beans>
Integer
class 中没有 number
字段,所以从 SQL 的角度来看总是 null
。当使用基元作为键 and/or 值时,您可以使用预定义的 _key
和 _val
字段。所以查询可以是这样的:
select * from Integer where _val > ? and _val < ?
在这种情况下也没有太多理由使用 queryEntities
。 setIndexedTypes
就够了,而且简单多了。