加入方解石中的两条无限流 SQL
Join two infinite streams in calcite SQL
我在 java 中创建了两个无限测试流。两者都提供随机数据。
现在我想加入这些数据集并接收每个新到达数据的结果。
结果是:流A被处理到结束。然后流 B 被处理,直到它结束。
我要下结论:
- 如果流 A 是无限的,我永远不会得到任何结果,因为流 B 被忽略并且它一直读取 A 直到内存不足
- 流A中稍后到达的数据在完成A的读取后将不会被处理(如果A是有限的)
所以我有两个问题:我是不是做错了什么,还是方解石是这样设计的?如果我做错了,如何实现两个无限流的并行读取和处理?
我附上了我的主要 class、2 个流生成 classes 和 model.json.
的来源
public static void main(String[] args) throws Exception {
Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + "c:\calcite\model.json", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery("select stream * from S.B b full outer join S.A a on a.ID = b.PRODUCT_ID");
System.out.println("statement executed");
final StringBuilder buf = new StringBuilder();
while (resultSet.next()) {
int n = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= n; i++) {
buf.append(i > 1 ? "; " : "").append(resultSet.getMetaData().getColumnLabel(i)).append("=")
.append(resultSet.getObject(i));
}
System.out.println(buf.toString());
buf.setLength(0);
}
}
public abstract class Stream implements StreamableTable, ScannableTable{
public Schema.TableType getJdbcTableType() {
return Schema.TableType.STREAM;
}
@Override
public boolean isRolledUp(String column) {
return false;
}
@Override
public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
CalciteConnectionConfig config) {
return false;
}
public Table stream() {
return this;
}
public Statistic getStatistic() {
return Statistics.of(100d, ImmutableList.of());
}
}
public class StreamA extends Stream {
public static boolean wait = false;
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
private final String[] items = { "paint1", "paper2", "brush3" };
private int counter = 0;
public boolean hasNext() {
return true;
}
public Object[] next() {
System.out.println("next A");
final int index = counter++;
return new Object[] { System.currentTimeMillis(), index , items[index % items.length]};
}
public void remove() {
throw new UnsupportedOperationException();
}
});
}
protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
.add("ID", SqlTypeName.INTEGER).add("PRODUCT", SqlTypeName.VARCHAR, 10)
.build();
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return protoRowType.apply(typeFactory);
}
}
public class StreamB extends Stream {
public static boolean wait = false;
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
private int counter = 0;
public boolean hasNext() {
return true;
}
public Object[] next() {
System.out.println("next B");
final int index = counter++;
return new Object[] { System.currentTimeMillis(), index, (Math.abs((new Random()).nextInt())) % 4 +1, "Kauf "+index };
}
public void remove() {
throw new UnsupportedOperationException();
}
});
}
protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
.add("ID", SqlTypeName.INTEGER).add("PRODUCT_ID", SqlTypeName.INTEGER).add("NAME", SqlTypeName.VARCHAR, 15)
.build();
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return protoRowType.apply(typeFactory);
}
}
{
"version": "1.0",
"defaultSchema": "dummy",
"schemas": [
{
"name": "S",
"tables": [ {
"type": "custom",
"name": "A",
"stream": {
"stream": true
},
"factory": "de.mypackage.calcite.StreamFactory$FactoryA"
},
{
"type": "custom",
"name": "B",
"stream": {
"stream": true
},
"factory": "de.mypackage.calcite.StreamFactory$FactoryB"
}
]
}
]
}
比方说:不可能。
如何在窗口连接中使用两个无限数据流?在 calite doumentation 中,我只找到了 rowtime 作为连接条件,但这也行不通
方解石目前不支持流到流的连接。您可以在 the documentation on streaming. I don't believe there has been recent progress, but you can follow the status of stream join support in Calcite's issue tracker.
中找到更多信息
我将添加一些关于它计划如何工作的注释。只有在特定 window 内这样做才有意义。为此,您需要在每个流中使用单调列,例如 rowtime
,这样可以保证查询可以取得进展。
这是一个从文档中连接两个订单流和发货流的示例:
SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
在这种情况下,您可以看到有一个约束条件,即必须在下单后一小时内发货。在这种情况下,Calcite 将能够在查询中进行前向处理,因为 Orders.rowtime
和 Shipments.rowtime
都是单调的。
我在 java 中创建了两个无限测试流。两者都提供随机数据。 现在我想加入这些数据集并接收每个新到达数据的结果。
结果是:流A被处理到结束。然后流 B 被处理,直到它结束。 我要下结论:
- 如果流 A 是无限的,我永远不会得到任何结果,因为流 B 被忽略并且它一直读取 A 直到内存不足
- 流A中稍后到达的数据在完成A的读取后将不会被处理(如果A是有限的)
所以我有两个问题:我是不是做错了什么,还是方解石是这样设计的?如果我做错了,如何实现两个无限流的并行读取和处理?
我附上了我的主要 class、2 个流生成 classes 和 model.json.
的来源public static void main(String[] args) throws Exception {
Class.forName("org.apache.calcite.jdbc.Driver");
Properties info = new Properties();
info.setProperty("lex", "JAVA");
Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + "c:\calcite\model.json", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
Statement statement = connection.createStatement();
ResultSet resultSet =
statement.executeQuery("select stream * from S.B b full outer join S.A a on a.ID = b.PRODUCT_ID");
System.out.println("statement executed");
final StringBuilder buf = new StringBuilder();
while (resultSet.next()) {
int n = resultSet.getMetaData().getColumnCount();
for (int i = 1; i <= n; i++) {
buf.append(i > 1 ? "; " : "").append(resultSet.getMetaData().getColumnLabel(i)).append("=")
.append(resultSet.getObject(i));
}
System.out.println(buf.toString());
buf.setLength(0);
}
}
public abstract class Stream implements StreamableTable, ScannableTable{
public Schema.TableType getJdbcTableType() {
return Schema.TableType.STREAM;
}
@Override
public boolean isRolledUp(String column) {
return false;
}
@Override
public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
CalciteConnectionConfig config) {
return false;
}
public Table stream() {
return this;
}
public Statistic getStatistic() {
return Statistics.of(100d, ImmutableList.of());
}
}
public class StreamA extends Stream {
public static boolean wait = false;
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
private final String[] items = { "paint1", "paper2", "brush3" };
private int counter = 0;
public boolean hasNext() {
return true;
}
public Object[] next() {
System.out.println("next A");
final int index = counter++;
return new Object[] { System.currentTimeMillis(), index , items[index % items.length]};
}
public void remove() {
throw new UnsupportedOperationException();
}
});
}
protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
.add("ID", SqlTypeName.INTEGER).add("PRODUCT", SqlTypeName.VARCHAR, 10)
.build();
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return protoRowType.apply(typeFactory);
}
}
public class StreamB extends Stream {
public static boolean wait = false;
public Enumerable<Object[]> scan(DataContext root) {
return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
private int counter = 0;
public boolean hasNext() {
return true;
}
public Object[] next() {
System.out.println("next B");
final int index = counter++;
return new Object[] { System.currentTimeMillis(), index, (Math.abs((new Random()).nextInt())) % 4 +1, "Kauf "+index };
}
public void remove() {
throw new UnsupportedOperationException();
}
});
}
protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
.add("ID", SqlTypeName.INTEGER).add("PRODUCT_ID", SqlTypeName.INTEGER).add("NAME", SqlTypeName.VARCHAR, 15)
.build();
public RelDataType getRowType(RelDataTypeFactory typeFactory) {
return protoRowType.apply(typeFactory);
}
}
{
"version": "1.0",
"defaultSchema": "dummy",
"schemas": [
{
"name": "S",
"tables": [ {
"type": "custom",
"name": "A",
"stream": {
"stream": true
},
"factory": "de.mypackage.calcite.StreamFactory$FactoryA"
},
{
"type": "custom",
"name": "B",
"stream": {
"stream": true
},
"factory": "de.mypackage.calcite.StreamFactory$FactoryB"
}
]
}
]
}
比方说:不可能。 如何在窗口连接中使用两个无限数据流?在 calite doumentation 中,我只找到了 rowtime 作为连接条件,但这也行不通
方解石目前不支持流到流的连接。您可以在 the documentation on streaming. I don't believe there has been recent progress, but you can follow the status of stream join support in Calcite's issue tracker.
中找到更多信息我将添加一些关于它计划如何工作的注释。只有在特定 window 内这样做才有意义。为此,您需要在每个流中使用单调列,例如 rowtime
,这样可以保证查询可以取得进展。
这是一个从文档中连接两个订单流和发货流的示例:
SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
ON o.orderId = s.orderId
AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;
在这种情况下,您可以看到有一个约束条件,即必须在下单后一小时内发货。在这种情况下,Calcite 将能够在查询中进行前向处理,因为 Orders.rowtime
和 Shipments.rowtime
都是单调的。