Apache flink full outer join 的错误结果
wrong result in Apache flink full outer join
我有 2 个数据流,它们是从 2 个表创建的,例如:
Table orderRes1 = ste.sqlQuery(
"SELECT orderId, userId, SUM(bidPrice) as q FROM " + tble +
" Group by orderId, userId");
Table orderRes2 = ste.sqlQuery(
"SELECT orderId, userId, SUM(askPrice) as q FROM " + tble +
" Group by orderId, userId");
DataStream<Tuple2<Boolean, Row>> ds1 = ste.toRetractStream(orderRes1 , Row.class).
filter(order-> order.f0);
DataStream<Tuple2<Boolean, Row>> ds2 = ste.toRetractStream(orderRes2 , Row.class).
filter(order-> order.f0);
我想对这 2 个流执行完全外部连接,并且我都使用了 orderRes1.fullOuterJoin(orderRes2 ,$(exp))
和包含完整外部联接的 sql 查询,如下所示:
Table bidOrdr = ste.fromDataStream(bidTuple, $("orderId"),
$("userId"), $("price"));
Table askOrdr = ste.fromDataStream(askTuple, $("orderId"),
$("userId"), $("price"));
Table result = ste.sqlQuery(
"SELECT COALESCE(bidTbl.orderId,askTbl.orderId) , " +
" COALESCE(bidTbl.userId,askTbl.orderId)," +
" COALESCE(bidTbl.bidTotalPrice,0) as bidTotalPrice, " +
" COALESCE(askTbl.askTotalPrice,0) as askTotalPrice, " +
" FROM " +
" (SELECT orderId, userId," +
" SUM(price) AS bidTotalPrice " +
" FROM " + bidOrdr +
" Group by orderId, userId) bidTbl full outer JOIN " +
" (SELECT orderId, userId," +
" SUM(price) AS askTotalPrice" +
" FROM " + askOrdr +
" Group by orderId, userId) askTbl " +
" ON (bidTbl.orderId = askTbl.orderId" +
" AND bidTbl.userId= askTbl.userId) ") ;
DataStream<Tuple2<Boolean, Row>> = ste.toRetractStream(result, Row.class).filter(order -> order.f0);
但是,在某些情况下结果不正确:假设用户A以价格卖给B 3次,之后用户B卖给A 2次,第二次结果是:
7> (真,123,a,300.0,0.0)
7> (真,123,a,300.0,200.0)
10> (真,123,b,0.0,300.0)
10> (真,123,b,200.0,300.0)
第二行和第四行是流的预期结果,但它也会生成第一行和第三行。
值得一提的是 coGroup 是另一种解决方案,但我不想在这种情况下使用窗口,非窗口解决方案只能在有界流 (DataSet) 中访问。
提示:orderId 和 userId 将在两个流中重复,我想在每个操作中生成 2 行,包含:
orderId、userId1、bidTotalPrice、askTotalPrice 和
orderId, userId2, bidTotalPrice, askTotalPrice
流式查询(或者换句话说,在动态表上执行的查询)会出现类似的情况。与传统数据库不同,查询的输入关系在查询执行期间保持静态,流式查询的输入不断更新——因此结果也必须不断更新。
如果我理解这里的设置,第 1 行和第 3 行的“不正确”结果在处理来自 orderRes2
的相关行之前都是正确的。如果这些行永远不会到达,那么第 1 行和第 3 行将保持正确。
您应该期待的是最终正确的结果,包括必要时的撤回。您可以通过打开 mini-batch aggregation.
来减少中间结果的数量
这 mailing list thread 提供了更多见解。如果我误解了您的情况,请提供可重现的示例来说明问题。
我有 2 个数据流,它们是从 2 个表创建的,例如:
Table orderRes1 = ste.sqlQuery(
"SELECT orderId, userId, SUM(bidPrice) as q FROM " + tble +
" Group by orderId, userId");
Table orderRes2 = ste.sqlQuery(
"SELECT orderId, userId, SUM(askPrice) as q FROM " + tble +
" Group by orderId, userId");
DataStream<Tuple2<Boolean, Row>> ds1 = ste.toRetractStream(orderRes1 , Row.class).
filter(order-> order.f0);
DataStream<Tuple2<Boolean, Row>> ds2 = ste.toRetractStream(orderRes2 , Row.class).
filter(order-> order.f0);
我想对这 2 个流执行完全外部连接,并且我都使用了 orderRes1.fullOuterJoin(orderRes2 ,$(exp))
和包含完整外部联接的 sql 查询,如下所示:
Table bidOrdr = ste.fromDataStream(bidTuple, $("orderId"),
$("userId"), $("price"));
Table askOrdr = ste.fromDataStream(askTuple, $("orderId"),
$("userId"), $("price"));
Table result = ste.sqlQuery(
"SELECT COALESCE(bidTbl.orderId,askTbl.orderId) , " +
" COALESCE(bidTbl.userId,askTbl.orderId)," +
" COALESCE(bidTbl.bidTotalPrice,0) as bidTotalPrice, " +
" COALESCE(askTbl.askTotalPrice,0) as askTotalPrice, " +
" FROM " +
" (SELECT orderId, userId," +
" SUM(price) AS bidTotalPrice " +
" FROM " + bidOrdr +
" Group by orderId, userId) bidTbl full outer JOIN " +
" (SELECT orderId, userId," +
" SUM(price) AS askTotalPrice" +
" FROM " + askOrdr +
" Group by orderId, userId) askTbl " +
" ON (bidTbl.orderId = askTbl.orderId" +
" AND bidTbl.userId= askTbl.userId) ") ;
DataStream<Tuple2<Boolean, Row>> = ste.toRetractStream(result, Row.class).filter(order -> order.f0);
但是,在某些情况下结果不正确:假设用户A以价格卖给B 3次,之后用户B卖给A 2次,第二次结果是:
7> (真,123,a,300.0,0.0)
7> (真,123,a,300.0,200.0)
10> (真,123,b,0.0,300.0)
10> (真,123,b,200.0,300.0)
第二行和第四行是流的预期结果,但它也会生成第一行和第三行。 值得一提的是 coGroup 是另一种解决方案,但我不想在这种情况下使用窗口,非窗口解决方案只能在有界流 (DataSet) 中访问。
提示:orderId 和 userId 将在两个流中重复,我想在每个操作中生成 2 行,包含: orderId、userId1、bidTotalPrice、askTotalPrice 和 orderId, userId2, bidTotalPrice, askTotalPrice
流式查询(或者换句话说,在动态表上执行的查询)会出现类似的情况。与传统数据库不同,查询的输入关系在查询执行期间保持静态,流式查询的输入不断更新——因此结果也必须不断更新。
如果我理解这里的设置,第 1 行和第 3 行的“不正确”结果在处理来自 orderRes2
的相关行之前都是正确的。如果这些行永远不会到达,那么第 1 行和第 3 行将保持正确。
您应该期待的是最终正确的结果,包括必要时的撤回。您可以通过打开 mini-batch aggregation.
来减少中间结果的数量这 mailing list thread 提供了更多见解。如果我误解了您的情况,请提供可重现的示例来说明问题。