leftOuterJoin 抛出 TableException:不支持的连接类型 'LEFT'
leftOuterJoin throws TableException: Unsupported join type 'LEFT'
我正在尝试 运行 两个 table 上的左外连接并将结果转换为 DataStream。
我在使用 flink 之前所做的所有连接都是内部连接,并且我总是在连接后跟一个 .toRetractStream[MyCaseClass](someQueryConfig)
。但是,由于左连接引入了空值,我对 flink docs 的理解是我不能再使用 case 类 因为它们在转换 [=29 时不支持空值=] 到数据流。
所以,我正在尝试使用 POJO 来完成此操作。这是我的代码:
class EnrichedTaskUpdateJoin(val enrichedTaskId: String, val enrichedTaskJobId: String, val enrichedTaskJobDate: String, val enrichedTaskJobMetadata: Json, val enrichedTaskStartedAt: String, val enrichedTaskTaskMetadata: Json, val taskUpdateMetadata: Json = Json.Null) {}
val qConfig = tableEnv.queryConfig
qConfig.withIdleStateRetentionTime(IDLE_STATE_RETENTION_TIME)
val updatedTasksUpsertTable = enrichedTasksUpsertTable
.leftOuterJoin(taskUpdatesUpsertTable, 'enrichedTaskId === 'taskUpdateId)
.select(
'enrichedTaskId,
'enrichedTaskJobId,
'enrichedTaskJobDate,
'enrichedTaskJobMetadata,
'enrichedTaskStartedAt,
'enrichedTaskTaskMetadata,
'taskUpdateMetadata
)
val updatedEnrichedTasksStream: KeyedStream[String, String] = updatedTasksUpsertTable
.toAppendStream[EnrichedTaskUpdateJoin](qConfig)
.map(toEnrichedTask(_))
.map(encodeTask(_))
.keyBy(x => parse(x).getOrElse(Json.Null).hcursor.get[String]("id").getOrElse(""))
这编译得很好,但是当我尝试 运行 它时,我得到 org.apache.flink.table.api.TableException: Unsupported join type 'LEFT'. Currently only non-window inner joins with at least one equality predicate are supported
。但是,根据 these docs,似乎我应该能够 运行 左连接。值得注意的是,错误是从 .toAppendStream[EnrichedTaskUpdateJoin](qConfig)
中抛出的。
我想也许错误的 non-window
部分暗示我的空闲状态保留时间有问题,所以我取出查询配置,但得到了同样的错误。
希望这有足够的上下文,但如果我需要添加任何其他内容,请告诉我。另外,我正在 运行ning flink 1.5-SNAPSHOT 和 Circe 进行 json 解析。我也是 scala 的新手,所以这很可能只是一些愚蠢的语法错误。
Flink 1.5-SNAPSHOT 不支持非窗口外部联接。正如您在发布的 link 中看到的那样,"Outer Joins" 旁边没有 "Streaming" 标签。 1.5 支持时间窗连接(在时间属性上工作)。
Flink 1.6 将提供 LEFT
、RIGHT
和 FULL
外连接(另请参阅 FLINK-5878)。
顺便说一句。确保 EnrichedTaskUpdateJoin
确实是一个 POJO,因为 POJO 需要一个默认构造函数,而且我认为 var
而不是 val
.
我正在尝试 运行 两个 table 上的左外连接并将结果转换为 DataStream。
我在使用 flink 之前所做的所有连接都是内部连接,并且我总是在连接后跟一个 .toRetractStream[MyCaseClass](someQueryConfig)
。但是,由于左连接引入了空值,我对 flink docs 的理解是我不能再使用 case 类 因为它们在转换 [=29 时不支持空值=] 到数据流。
所以,我正在尝试使用 POJO 来完成此操作。这是我的代码:
class EnrichedTaskUpdateJoin(val enrichedTaskId: String, val enrichedTaskJobId: String, val enrichedTaskJobDate: String, val enrichedTaskJobMetadata: Json, val enrichedTaskStartedAt: String, val enrichedTaskTaskMetadata: Json, val taskUpdateMetadata: Json = Json.Null) {}
val qConfig = tableEnv.queryConfig
qConfig.withIdleStateRetentionTime(IDLE_STATE_RETENTION_TIME)
val updatedTasksUpsertTable = enrichedTasksUpsertTable
.leftOuterJoin(taskUpdatesUpsertTable, 'enrichedTaskId === 'taskUpdateId)
.select(
'enrichedTaskId,
'enrichedTaskJobId,
'enrichedTaskJobDate,
'enrichedTaskJobMetadata,
'enrichedTaskStartedAt,
'enrichedTaskTaskMetadata,
'taskUpdateMetadata
)
val updatedEnrichedTasksStream: KeyedStream[String, String] = updatedTasksUpsertTable
.toAppendStream[EnrichedTaskUpdateJoin](qConfig)
.map(toEnrichedTask(_))
.map(encodeTask(_))
.keyBy(x => parse(x).getOrElse(Json.Null).hcursor.get[String]("id").getOrElse(""))
这编译得很好,但是当我尝试 运行 它时,我得到 org.apache.flink.table.api.TableException: Unsupported join type 'LEFT'. Currently only non-window inner joins with at least one equality predicate are supported
。但是,根据 these docs,似乎我应该能够 运行 左连接。值得注意的是,错误是从 .toAppendStream[EnrichedTaskUpdateJoin](qConfig)
中抛出的。
我想也许错误的 non-window
部分暗示我的空闲状态保留时间有问题,所以我取出查询配置,但得到了同样的错误。
希望这有足够的上下文,但如果我需要添加任何其他内容,请告诉我。另外,我正在 运行ning flink 1.5-SNAPSHOT 和 Circe 进行 json 解析。我也是 scala 的新手,所以这很可能只是一些愚蠢的语法错误。
Flink 1.5-SNAPSHOT 不支持非窗口外部联接。正如您在发布的 link 中看到的那样,"Outer Joins" 旁边没有 "Streaming" 标签。 1.5 支持时间窗连接(在时间属性上工作)。
Flink 1.6 将提供 LEFT
、RIGHT
和 FULL
外连接(另请参阅 FLINK-5878)。
顺便说一句。确保 EnrichedTaskUpdateJoin
确实是一个 POJO,因为 POJO 需要一个默认构造函数,而且我认为 var
而不是 val
.