加入方解石中的两条无限流 SQL

Join two infinite streams in calcite SQL

我在 java 中创建了两个无限测试流。两者都提供随机数据。 现在我想加入这些数据集并接收每个新到达数据的结果。

结果是:流A被处理到结束。然后流 B 被处理,直到它结束。 我要下结论:

  1. 如果流 A 是无限的,我永远不会得到任何结果,因为流 B 被忽略并且它一直读取 A 直到内存不足
  2. 流A中稍后到达的数据在完成A的读取后将不会被处理(如果A是有限的)


我附上了我的主要 class、2 个流生成 classes 和 model.json.

public static void main(String[] args) throws Exception {
    Properties info = new Properties();
    info.setProperty("lex", "JAVA");
    Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + "c:\calcite\model.json", info);
    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
        Statement statement = connection.createStatement();
    ResultSet resultSet = 
            statement.executeQuery("select stream * from S.B b full outer join S.A a on a.ID = b.PRODUCT_ID");

    System.out.println("statement executed");
        final StringBuilder buf = new StringBuilder();
    while (resultSet.next()) {
        int n = resultSet.getMetaData().getColumnCount();
        for (int i = 1; i <= n; i++) {
            buf.append(i > 1 ? "; " : "").append(resultSet.getMetaData().getColumnLabel(i)).append("=")

public abstract class Stream implements StreamableTable, ScannableTable{

public Schema.TableType getJdbcTableType() {
    return Schema.TableType.STREAM;

public boolean isRolledUp(String column) {
    return false;

public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
        CalciteConnectionConfig config) {
    return false;
public Table stream() {
    return this;
public Statistic getStatistic() {
    return Statistics.of(100d, ImmutableList.of());


public class StreamA extends Stream {
   public static  boolean wait = false;
   public Enumerable<Object[]> scan(DataContext root) {
    return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
        private final String[] items = { "paint1", "paper2", "brush3" };
        private int counter = 0;
        public boolean hasNext() {
            return true;

        public Object[] next() {
            System.out.println("next A");
            final int index = counter++;
            return new Object[] { System.currentTimeMillis(), index , items[index % items.length]};

        public void remove() {
            throw new UnsupportedOperationException();


protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
        .add("ID", SqlTypeName.INTEGER).add("PRODUCT", SqlTypeName.VARCHAR, 10)

public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    return protoRowType.apply(typeFactory);

public class StreamB extends Stream {
   public static  boolean wait = false;
   public Enumerable<Object[]> scan(DataContext root) {
    return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
        private int counter = 0;
        public boolean hasNext() {
            return true;

        public Object[] next() {
            System.out.println("next B");
            final int index = counter++;
            return new Object[] { System.currentTimeMillis(), index, (Math.abs((new Random()).nextInt())) % 4 +1, "Kauf "+index };

        public void remove() {
            throw new UnsupportedOperationException();


protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
        .add("ID", SqlTypeName.INTEGER).add("PRODUCT_ID", SqlTypeName.INTEGER).add("NAME", SqlTypeName.VARCHAR, 15)

public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    return protoRowType.apply(typeFactory);

 "version": "1.0",
 "defaultSchema": "dummy",
 "schemas": [
  "name": "S",
  "tables": [ {
    "type": "custom",
    "name": "A",
    "stream": {
      "stream": true
    "factory": "de.mypackage.calcite.StreamFactory$FactoryA"
    "type": "custom",
    "name": "B",
    "stream": {
      "stream": true
    "factory": "de.mypackage.calcite.StreamFactory$FactoryB"

比方说:不可能。 如何在窗口连接中使用两个无限数据流?在 calite doumentation 中,我只找到了 rowtime 作为连接条件,但这也行不通

方解石目前不支持流到流的连接。您可以在 the documentation on streaming. I don't believe there has been recent progress, but you can follow the status of stream join support in Calcite's issue tracker.


我将添加一些关于它计划如何工作的注释。只有在特定 window 内这样做才有意义。为此,您需要在每个流中使用单调列,例如 rowtime,这样可以保证查询可以取得进展。


SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
  ON o.orderId = s.orderId
  AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;

在这种情况下,您可以看到有一个约束条件,即必须在下单后一小时内发货。在这种情况下,Calcite 将能够在查询中进行前向处理,因为 Orders.rowtimeShipments.rowtime 都是单调的。