使用 CSV 使用方解石创建流式示例
Create a streaming example with Calcite using CSV
我正在尝试使用 Calcite 创建一个基本流程序,使用 CSV 数据 source.I 可以 运行 使用 sqlline 进行查询,但我不能以编程方式进行。我的代码是:
example.json
{
version: '1.0',
defaultSchema: 'STREAM',
schemas: [
{
name: 'SS',
tables: [
{
name: 'ORDERS',
type: 'custom',
factory: 'org.apache.calcite.adapter.csv.CsvStreamTableFactory',
stream: {
stream: true
},
operand: {
file: 'sales/SORDERS.csv',
flavor: "scannable"
}
}
]
}
]
}
SORDERS.csv
PRODUCTID:int,ORDERID:int,UNITS:int
3,4,5
2,5,12
2,1,6
SimpleQuery.java
package stream_test;
import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
/**
* Example of using Calcite via JDBC.
*
* <p>Schema is specified programmatically.</p>
*/
public class SimpleQuery {
public static void main(String[] args) throws Exception {
new SimpleQuery().run();
}
public void run() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
Connection connection =
DriverManager.getConnection("jdbc:calcite:model="
+ "/home/hduser/Downloads/calcite-master/example/csv/target/test-classes/example.json",info);
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
//SchemaPlus rootSchema = calciteConnection.getRootSchema();
//rootSchema.add("os", new ReflectiveSchema(new Os()));
Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery("select stream * from SS.ORDERS where SS.ORDERS.UNITS > 5");
final StringBuilder buf = new StringBuilder();
while (resultSet.next()) {
int n = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= n; i++) {
buf.append(i > 1 ? "; " : "")
.append(resultSet.getMetaData().getColumnLabel(i))
.append("=")
.append(resultSet.getObject(i));
}
System.out.println(buf.toString());
buf.setLength(0);
}
resultSet.close();
statement.close();
connection.close();
}
}
最后我有 calcite-core 1.8.0, net.sf.opencsv 2.3, calcite-avatica 1.6.0, calcite-linq4j 1.8.0, sqlline 1.1.9, hamcrest-core 1.3, com.github.stephenc.jcip 1.0-1,commons-lang3 3.4,番石榴 19.0
并导入了 calcite-example-csv-1.9.0-SNAPSHOT(我用 github 版本的 maven 打包)。
当我尝试 运行 我得到的代码:
Exception in thread "main" java.lang.NoSuchFieldError: CANCEL_FLAG
at org.apache.calcite.adapter.csv.CsvStreamScannableTable.scan(CsvStreamScannableTable.java:66)
at org.apache.calcite.interpreter.TableScanNode.createScannable(TableScanNode.java:117)
at org.apache.calcite.interpreter.TableScanNode.create(TableScanNode.java:94)
at org.apache.calcite.interpreter.Nodes$CoreCompiler.visit(Nodes.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.calcite.util.ReflectUtil.invokeVisitorInternal(ReflectUtil.java:257)
at org.apache.calcite.util.ReflectUtil.invokeVisitor(ReflectUtil.java:214)
at org.apache.calcite.util.ReflectUtil.invokeVisitor(ReflectUtil.java:471)
at org.apache.calcite.interpreter.Interpreter$Compiler.visit(Interpreter.java:476)
at org.apache.calcite.interpreter.Interpreter$Compiler.visitRoot(Interpreter.java:433)
at org.apache.calcite.interpreter.Interpreter.<init>(Interpreter.java:75)
at Baz.bind(Unknown Source)
at org.apache.calcite.jdbc.CalcitePrepare$CalciteSignature.enumerable(CalcitePrepare.java:327)
at org.apache.calcite.jdbc.CalciteConnectionImpl.enumerable(CalciteConnectionImpl.java:282)
at package stream_test;
有什么解决办法吗?
您的库不匹配(calcite-example-csv 版本 1.9.0-SNAPSHOT,与 calcite-core 版本 1.8.0)。有关详细信息,请参阅 the discussion on calcite dev list。
在 Eclipse 中我使用了:
作为 Maven 依赖项:
commons-io 2.4,commons-logging 1.1.3,commons-lang3 3.2,janino 2.7.6,eigenbase-properties 1.1.5,avatica 1.8.0,opencsv 2.3,json-simple 1.1
并作为外部罐子:
calcite-core 1.9.0、example-csv-1.9.0、calcite-linq4j 1.9.0(所有 SNAPSHOT 版本)在使用 github 最新版本的方解石的 mvn install 命令创建它们之后。
我正在尝试使用 Calcite 创建一个基本流程序,使用 CSV 数据 source.I 可以 运行 使用 sqlline 进行查询,但我不能以编程方式进行。我的代码是:
example.json
{
version: '1.0',
defaultSchema: 'STREAM',
schemas: [
{
name: 'SS',
tables: [
{
name: 'ORDERS',
type: 'custom',
factory: 'org.apache.calcite.adapter.csv.CsvStreamTableFactory',
stream: {
stream: true
},
operand: {
file: 'sales/SORDERS.csv',
flavor: "scannable"
}
}
]
}
]
}
SORDERS.csv
PRODUCTID:int,ORDERID:int,UNITS:int
3,4,5
2,5,12
2,1,6
SimpleQuery.java
package stream_test;
import org.apache.calcite.adapter.java.ReflectiveSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
/**
* Example of using Calcite via JDBC.
*
* <p>Schema is specified programmatically.</p>
*/
public class SimpleQuery {
public static void main(String[] args) throws Exception {
new SimpleQuery().run();
}
public void run() throws ClassNotFoundException, SQLException {
Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
Connection connection =
DriverManager.getConnection("jdbc:calcite:model="
+ "/home/hduser/Downloads/calcite-master/example/csv/target/test-classes/example.json",info);
CalciteConnection calciteConnection =
connection.unwrap(CalciteConnection.class);
//SchemaPlus rootSchema = calciteConnection.getRootSchema();
//rootSchema.add("os", new ReflectiveSchema(new Os()));
Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery("select stream * from SS.ORDERS where SS.ORDERS.UNITS > 5");
final StringBuilder buf = new StringBuilder();
while (resultSet.next()) {
int n = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= n; i++) {
buf.append(i > 1 ? "; " : "")
.append(resultSet.getMetaData().getColumnLabel(i))
.append("=")
.append(resultSet.getObject(i));
}
System.out.println(buf.toString());
buf.setLength(0);
}
resultSet.close();
statement.close();
connection.close();
}
}
最后我有 calcite-core 1.8.0, net.sf.opencsv 2.3, calcite-avatica 1.6.0, calcite-linq4j 1.8.0, sqlline 1.1.9, hamcrest-core 1.3, com.github.stephenc.jcip 1.0-1,commons-lang3 3.4,番石榴 19.0 并导入了 calcite-example-csv-1.9.0-SNAPSHOT(我用 github 版本的 maven 打包)。
当我尝试 运行 我得到的代码:
Exception in thread "main" java.lang.NoSuchFieldError: CANCEL_FLAG
at org.apache.calcite.adapter.csv.CsvStreamScannableTable.scan(CsvStreamScannableTable.java:66)
at org.apache.calcite.interpreter.TableScanNode.createScannable(TableScanNode.java:117)
at org.apache.calcite.interpreter.TableScanNode.create(TableScanNode.java:94)
at org.apache.calcite.interpreter.Nodes$CoreCompiler.visit(Nodes.java:68)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.calcite.util.ReflectUtil.invokeVisitorInternal(ReflectUtil.java:257)
at org.apache.calcite.util.ReflectUtil.invokeVisitor(ReflectUtil.java:214)
at org.apache.calcite.util.ReflectUtil.invokeVisitor(ReflectUtil.java:471)
at org.apache.calcite.interpreter.Interpreter$Compiler.visit(Interpreter.java:476)
at org.apache.calcite.interpreter.Interpreter$Compiler.visitRoot(Interpreter.java:433)
at org.apache.calcite.interpreter.Interpreter.<init>(Interpreter.java:75)
at Baz.bind(Unknown Source)
at org.apache.calcite.jdbc.CalcitePrepare$CalciteSignature.enumerable(CalcitePrepare.java:327)
at org.apache.calcite.jdbc.CalciteConnectionImpl.enumerable(CalciteConnectionImpl.java:282)
at package stream_test;
有什么解决办法吗?
您的库不匹配(calcite-example-csv 版本 1.9.0-SNAPSHOT,与 calcite-core 版本 1.8.0)。有关详细信息,请参阅 the discussion on calcite dev list。
在 Eclipse 中我使用了:
作为 Maven 依赖项: commons-io 2.4,commons-logging 1.1.3,commons-lang3 3.2,janino 2.7.6,eigenbase-properties 1.1.5,avatica 1.8.0,opencsv 2.3,json-simple 1.1
并作为外部罐子: calcite-core 1.9.0、example-csv-1.9.0、calcite-linq4j 1.9.0(所有 SNAPSHOT 版本)在使用 github 最新版本的方解石的 mvn install 命令创建它们之后。