在 BinaryObjects 的 Ignite 缓存上执行 SQL
Execute SQL on Ignite cache of BinaryObjects
我正在从 spark 数据帧创建 BinaryObject 缓存,然后我想在该 ignite 缓存上执行 SQL。
这是我的代码,其中 bank
是包含三个字段(id、姓名和年龄)的数据框:
val ic = new IgniteContext(sc, () => new IgniteConfiguration())
val cacheConfig = new CacheConfiguration[BinaryObject, BinaryObject]()
cacheConfig.setName("test123")
cacheConfig.setStoreKeepBinary(true)
cacheConfig.setIndexedTypes(classOf[BinaryObject], classOf[BinaryObject])
val qe = new QueryEntity()
qe.setKeyType(TestKey)
qe.setValueType(TestValue)
val fields = new java.util.LinkedHashMap[String, String]()
fields.put("id", "java.lang.Long")
fields.put("name", "java.lang.String")
fields.put("age", "java.lang.Int")
qe.setFields(fields)
val qes = new java.util.ArrayList[QueryEntity]()
qes.add(qe)
cacheConfig.setQueryEntities(qes)
val cache = ic.fromCache[BinaryObject, BinaryObject](cacheConfig)
cache.savePairs(bank.rdd, (row: Bank, iContext: IgniteContext) => {
val keyBuilder = iContext.ignite().binary().builder("TestKey");
keyBuilder.setField("id", row.id);
val key = keyBuilder.build();
val valueBuilder = iContext.ignite().binary().builder("TestValue");
valueBuilder.setField("name", row.name);
valueBuilder.setField("age", row.age);
val value = valueBuilder.build();
(key, value);
}, true)
现在我正在尝试执行这样的 SQL 查询:
cache.sql("select age from TestValue")
失败并出现以下异常:
Caused by: org.h2.jdbc.JdbcSQLException: Column "AGE" not found; SQL statement:
select age from TestValue [42122-191]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:345)
at org.h2.message.DbException.get(DbException.java:179)
at org.h2.message.DbException.get(DbException.java:155)
at org.h2.expression.ExpressionColumn.optimize(ExpressionColumn.java:147)
at org.h2.command.dml.Select.prepare(Select.java:852)
我做错了什么?
字段类型age
不正确,应该是:
fields.put("age", "java.lang.Integer")
我正在从 spark 数据帧创建 BinaryObject 缓存,然后我想在该 ignite 缓存上执行 SQL。
这是我的代码,其中 bank
是包含三个字段(id、姓名和年龄)的数据框:
val ic = new IgniteContext(sc, () => new IgniteConfiguration())
val cacheConfig = new CacheConfiguration[BinaryObject, BinaryObject]()
cacheConfig.setName("test123")
cacheConfig.setStoreKeepBinary(true)
cacheConfig.setIndexedTypes(classOf[BinaryObject], classOf[BinaryObject])
val qe = new QueryEntity()
qe.setKeyType(TestKey)
qe.setValueType(TestValue)
val fields = new java.util.LinkedHashMap[String, String]()
fields.put("id", "java.lang.Long")
fields.put("name", "java.lang.String")
fields.put("age", "java.lang.Int")
qe.setFields(fields)
val qes = new java.util.ArrayList[QueryEntity]()
qes.add(qe)
cacheConfig.setQueryEntities(qes)
val cache = ic.fromCache[BinaryObject, BinaryObject](cacheConfig)
cache.savePairs(bank.rdd, (row: Bank, iContext: IgniteContext) => {
val keyBuilder = iContext.ignite().binary().builder("TestKey");
keyBuilder.setField("id", row.id);
val key = keyBuilder.build();
val valueBuilder = iContext.ignite().binary().builder("TestValue");
valueBuilder.setField("name", row.name);
valueBuilder.setField("age", row.age);
val value = valueBuilder.build();
(key, value);
}, true)
现在我正在尝试执行这样的 SQL 查询:
cache.sql("select age from TestValue")
失败并出现以下异常:
Caused by: org.h2.jdbc.JdbcSQLException: Column "AGE" not found; SQL statement:
select age from TestValue [42122-191]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:345)
at org.h2.message.DbException.get(DbException.java:179)
at org.h2.message.DbException.get(DbException.java:155)
at org.h2.expression.ExpressionColumn.optimize(ExpressionColumn.java:147)
at org.h2.command.dml.Select.prepare(Select.java:852)
我做错了什么?
字段类型age
不正确,应该是:
fields.put("age", "java.lang.Integer")