加入方解石中的两条无限流 SQL

Join two infinite streams in calcite SQL

我在 java 中创建了两个无限测试流。两者都提供随机数据。 现在我想加入这些数据集并接收每个新到达数据的结果。

结果是:流A被处理到结束。然后流 B 被处理,直到它结束。 我要下结论:

  1. 如果流 A 是无限的,我永远不会得到任何结果,因为流 B 被忽略并且它一直读取 A 直到内存不足
  2. 流A中稍后到达的数据在完成A的读取后将不会被处理(如果A是有限的)

所以我有两个问题:我是不是做错了什么,还是方解石是这样设计的?如果我做错了,如何实现两个无限流的并行读取和处理?

我附上了我的主要 class、2 个流生成 classes 和 model.json.

的来源
public static void main(String[] args) throws Exception {
    Class.forName("org.apache.calcite.jdbc.Driver");
    Properties info = new Properties();
    info.setProperty("lex", "JAVA");
    Connection connection = DriverManager.getConnection("jdbc:calcite:model=" + "c:\calcite\model.json", info);
    CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
        Statement statement = connection.createStatement();
    ResultSet resultSet = 
            statement.executeQuery("select stream * from S.B b full outer join S.A a on a.ID = b.PRODUCT_ID");

    System.out.println("statement executed");
        final StringBuilder buf = new StringBuilder();
    
    while (resultSet.next()) {
        int n = resultSet.getMetaData().getColumnCount();
        for (int i = 1; i <= n; i++) {
            buf.append(i > 1 ? "; " : "").append(resultSet.getMetaData().getColumnLabel(i)).append("=")
                    .append(resultSet.getObject(i));
        }
        System.out.println(buf.toString());
        buf.setLength(0);
    }
}





public abstract class Stream implements StreamableTable, ScannableTable{

public Schema.TableType getJdbcTableType() {
    return Schema.TableType.STREAM;
}

@Override
public boolean isRolledUp(String column) {
    return false;
}

@Override
public boolean rolledUpColumnValidInsideAgg(String column, SqlCall call, SqlNode parent,
        CalciteConnectionConfig config) {
    return false;
}
public Table stream() {
    return this;
}
public Statistic getStatistic() {
    return Statistics.of(100d, ImmutableList.of());

}
}


public class StreamA extends Stream {
   public static  boolean wait = false;
   public Enumerable<Object[]> scan(DataContext root) {
    return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
        private final String[] items = { "paint1", "paper2", "brush3" };
        private int counter = 0;
        
        public boolean hasNext() {
            return true;
        }

        public Object[] next() {
            System.out.println("next A");
            
            final int index = counter++;
            
            return new Object[] { System.currentTimeMillis(), index , items[index % items.length]};
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
        

        
    });
    
}



protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
        .add("ID", SqlTypeName.INTEGER).add("PRODUCT", SqlTypeName.VARCHAR, 10)
        .build();

public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    return protoRowType.apply(typeFactory);
}
}





public class StreamB extends Stream {
   public static  boolean wait = false;
   public Enumerable<Object[]> scan(DataContext root) {
    return Linq4j.asEnumerable(() -> new Iterator<Object[]>() {
    
        private int counter = 0;
        
        public boolean hasNext() {
            return true;
        }

        public Object[] next() {
            System.out.println("next B");
            
            final int index = counter++;
            return new Object[] { System.currentTimeMillis(), index, (Math.abs((new Random()).nextInt())) % 4 +1, "Kauf "+index };
        }

        public void remove() {
            throw new UnsupportedOperationException();
        }
        

    });
}



protected final RelProtoDataType protoRowType = a0 -> a0.builder().add("ROWTIME", SqlTypeName.TIMESTAMP)
        .add("ID", SqlTypeName.INTEGER).add("PRODUCT_ID", SqlTypeName.INTEGER).add("NAME", SqlTypeName.VARCHAR, 15)
        .build();

public RelDataType getRowType(RelDataTypeFactory typeFactory) {
    return protoRowType.apply(typeFactory);
}
}



{
 "version": "1.0",
 "defaultSchema": "dummy",
 "schemas": [
{
  "name": "S",
  "tables": [ {
    "type": "custom",
    "name": "A",
    "stream": {
      "stream": true
    },
    "factory": "de.mypackage.calcite.StreamFactory$FactoryA"
  },
{
    "type": "custom",
    "name": "B",
    "stream": {
      "stream": true
    },
    "factory": "de.mypackage.calcite.StreamFactory$FactoryB"
      
      }
      ]
}
  ]
}

比方说:不可能。 如何在窗口连接中使用两个无限数据流?在 calite doumentation 中,我只找到了 rowtime 作为连接条件,但这也行不通

方解石目前不支持流到流的连接。您可以在 the documentation on streaming. I don't believe there has been recent progress, but you can follow the status of stream join support in Calcite's issue tracker.

中找到更多信息

我将添加一些关于它计划如何工作的注释。只有在特定 window 内这样做才有意义。为此,您需要在每个流中使用单调列,例如 rowtime,这样可以保证查询可以取得进展。

这是一个从文档中连接两个订单流和发货流的示例:

SELECT STREAM o.rowtime, o.productId, o.orderId, s.rowtime AS shipTime
FROM Orders AS o
JOIN Shipments AS s
  ON o.orderId = s.orderId
  AND s.rowtime BETWEEN o.rowtime AND o.rowtime + INTERVAL '1' HOUR;

在这种情况下,您可以看到有一个约束条件,即必须在下单后一小时内发货。在这种情况下,Calcite 将能够在查询中进行前向处理,因为 Orders.rowtimeShipments.rowtime 都是单调的。