将 Merge 语句从 teradata 迁移到 MYSQL

Migration of Merge statement from teradata to MYSQL

表格:- schema.INFA_TASK_RUN_STG schema.INFA_TASK_RUN

schema.INFA_TASK_RUN_STG 的主索引:- SUBJECT_AREA schema.INFA_TASK_RUN 的主索引:- SUBJECT_ID ,WORKFLOW_ID ,WORKFLOW_RUN_ID ,WORKLET_RUN_ID , INSTANCE_ID ,TASK_ID ,START_TIME

Teradata 中的合并语句:-

MERGE INTO schema.INFA_TASK_RUN USING schema.INFA_TASK_RUN_STG src
ON
        INFA_TASK_RUN_RAW.SUBJECT_ID = src.SUBJECT_ID
AND     INFA_TASK_RUN_RAW.WORKFLOW_ID = src.WORKFLOW_ID
AND     INFA_TASK_RUN_RAW.WORKFLOW_RUN_ID = src.WORKFLOW_RUN_ID
AND     INFA_TASK_RUN_RAW.WORKLET_RUN_ID = src.WORKLET_RUN_ID
AND     INFA_TASK_RUN_RAW.INSTANCE_ID = src.INSTANCE_ID
AND     INFA_TASK_RUN_RAW.TASK_ID = src.TASK_ID
AND     INFA_TASK_RUN_RAW.START_TIME = src.START_TIME
WHEN MATCHED THEN UPDATE SET
        END_TIME = src.END_TIME
,       RUN_ERR_CODE = src.RUN_ERR_CODE
,       RUN_ERR_MSG = src.RUN_ERR_MSG
,       RUN_STATUS_CODE = src.RUN_STATUS_CODE
WHEN NOT MATCHED THEN INSERT(
                SUBJECT_AREA
        ,       WORKFLOW_NAME
        ,       VERSION_NUMBER
        ,       SUBJECT_ID
        ,       WORKFLOW_ID
        ,       WORKFLOW_RUN_ID
        ,       WORKLET_RUN_ID
        ,       CHILD_RUN_ID
        ,       INSTANCE_ID
        ,       INSTANCE_NAME
        ,       TASK_ID
        ,       TASK_TYPE_NAME
        ,       TASK_TYPE
        ,       START_TIME
        ,       END_TIME
        ,       RUN_ERR_CODE
        ,       RUN_ERR_MSG
        ,       RUN_STATUS_CODE
        ,       TASK_NAME
        ,       TASK_VERSION_NUMBER
        ,       SERVER_ID
        ,       SERVER_NAME
        )VALUES(
                src.SUBJECT_AREA
        ,       src.WORKFLOW_NAME
        ,       src.VERSION_NUMBER
        ,       src.SUBJECT_ID
        ,       src.WORKFLOW_ID
        ,       src.WORKFLOW_RUN_ID
        ,       src.WORKLET_RUN_ID
        ,       src.CHILD_RUN_ID
        ,       src.INSTANCE_ID
        ,       src.INSTANCE_NAME
        ,       src.TASK_ID
        ,       src.TASK_TYPE_NAME
        ,       src.TASK_TYPE
        ,       src.START_TIME
        ,       src.END_TIME
        ,       src.RUN_ERR_CODE
        ,       src.RUN_ERR_MSG
        ,       src.RUN_STATUS_CODE
        ,       src.TASK_NAME
        ,       src.TASK_VERSION_NUMBER
        ,       src.SERVER_ID
        ,       src.SERVER_NAME
        );

据我所知MYSQL 数据库不支持 Merge 语句。我正在尝试更新和插入语句。不过好像不太对。

UPDATE schema.INFA_TASK_RUN tgt INNER JOIN schema.INFA_TASK_RUN_STG src
ON
       tgt.SUBJECT_ID = src.SUBJECT_ID
AND     tgt.WORKFLOW_ID = src.WORKFLOW_ID
AND     tgt.WORKFLOW_RUN_ID = src.WORKFLOW_RUN_ID
AND     tgt.WORKLET_RUN_ID = src.WORKLET_RUN_ID
AND     tgt.INSTANCE_ID = src.INSTANCE_ID
AND     tgt.TASK_ID = src.TASK_ID
AND     tgt.START_TIME = src.START_TIME
 SET
        tgt.END_TIME = src.END_TIME
,       tgt.RUN_ERR_CODE = src.RUN_ERR_CODE
,       tgt.RUN_ERR_MSG = src.RUN_ERR_MSG
,       tgt.RUN_STATUS_CODE = src.RUN_STATUS_CODE;

insert into schema.INFA_TASK_RUN (SUBJECT_AREA         ,       WORKFLOW_NAME         ,       VERSION_NUMBER         ,       SUBJECT_ID         ,       WORKFLOW_ID         ,       WORKFLOW_RUN_ID         ,       WORKLET_RUN_ID         ,       CHILD_RUN_ID         ,       INSTANCE_ID         ,       INSTANCE_NAME         ,       TASK_ID         ,       TASK_TYPE_NAME         ,       TASK_TYPE         ,       START_TIME         ,       END_TIME         ,       RUN_ERR_CODE         ,       RUN_ERR_MSG         ,       RUN_STATUS_CODE         ,       TASK_NAME         ,       TASK_VERSION_NUMBER         ,       SERVER_ID         ,       SERVER_NAME)
    select src.SUBJECT_AREA         ,       src.WORKFLOW_NAME         ,       src.VERSION_NUMBER         ,       src.SUBJECT_ID         ,       src.WORKFLOW_ID         ,       src.WORKFLOW_RUN_ID         ,       src.WORKLET_RUN_ID         ,       src.CHILD_RUN_ID         ,       src.INSTANCE_ID         ,       src.INSTANCE_NAME         ,       src.TASK_ID         ,       src.TASK_TYPE_NAME         ,       src.TASK_TYPE         ,       src.START_TIME         ,       src.END_TIME         ,       src.RUN_ERR_CODE         ,       src.RUN_ERR_MSG         ,       src.RUN_STATUS_CODE         ,       src.TASK_NAME         ,       src.TASK_VERSION_NUMBER         ,       src.SERVER_ID         ,       src.SERVER_NAME
    from schema.INFA_TASK_RUN_STG as src
        left outer join schema.INFA_TASK_RUN as tgt  ON
       tgt.SUBJECT_ID != src.SUBJECT_ID
AND     tgt.WORKFLOW_ID != src.WORKFLOW_ID
AND     tgt.WORKFLOW_RUN_ID != src.WORKFLOW_RUN_ID
AND     tgt.WORKLET_RUN_ID != src.WORKLET_RUN_ID
AND     tgt.INSTANCE_ID != src.INSTANCE_ID
AND     tgt.TASK_ID != src.TASK_ID
AND     tgt.START_TIME != src.START_TIME

相信您正在寻找的是这样的东西(未经测试,只有在正确设置主键时才会起作用):

INSERT INTO schema.INFA_TASK_RUN (
     SUBJECT_AREA
    ,WORKFLOW_NAME
    ,VERSION_NUMBER
    ,SUBJECT_ID
    ,WORKFLOW_ID
    ,WORKFLOW_RUN_ID
    ,WORKLET_RUN_ID
    ,CHILD_RUN_ID
    ,INSTANCE_ID
    ,INSTANCE_NAME
    ,TASK_ID
    ,TASK_TYPE_NAME
    ,TASK_TYPE
    ,START_TIME
    ,END_TIME
    ,RUN_ERR_CODE
    ,RUN_ERR_MSG
    ,RUN_STATUS_CODE
    ,TASK_NAME
    ,TASK_VERSION_NUMBER
    ,SERVER_ID
    ,SERVER_NAME
    )
SELECT
     SUBJECT_AREA
    ,WORKFLOW_NAME
    ,VERSION_NUMBER
    ,SUBJECT_ID
    ,WORKFLOW_ID
    ,WORKFLOW_RUN_ID
    ,WORKLET_RUN_ID
    ,CHILD_RUN_ID
    ,INSTANCE_ID
    ,INSTANCE_NAME
    ,TASK_ID
    ,TASK_TYPE_NAME
    ,TASK_TYPE
    ,START_TIME
    ,END_TIME
    ,RUN_ERR_CODE
    ,RUN_ERR_MSG
    ,RUN_STATUS_CODE
    ,TASK_NAME
    ,TASK_VERSION_NUMBER
    ,SERVER_ID
    ,SERVER_NAME
FROM schema.INFA_TASK_RUN_STG src
ON DUPLICATE KEY UPDATE
     END_TIME = src.END_TIME
    ,RUN_ERR_CODE = src.RUN_ERR_CODE
    ,RUN_ERR_MSG = src.RUN_ERR_MSG
    ,RUN_STATUS_CODE = src.RUN_STATUS_CODE;

编辑于 2020-05-21 以根据评论显示单独的更新和插入语句:

INSERT ... ON DUPLICATE KEY 语句可能会更快。

根据评论,我测试了原始语句以根据问题进行插入和更新。

请注意您的更新语句工作正常。 唯一的问题是即使没有更改,每一行都会更新。

可以为连接添加条件,例如 tgt.END_TIME != src.END_TIME 以确保只更新更改的记录。

您问题中的原始更新查询:

UPDATE schema.INFA_TASK_RUN tgt INNER JOIN schema.INFA_TASK_RUN_STG src
ON
       tgt.SUBJECT_ID = src.SUBJECT_ID
AND     tgt.WORKFLOW_ID = src.WORKFLOW_ID
AND     tgt.WORKFLOW_RUN_ID = src.WORKFLOW_RUN_ID
AND     tgt.WORKLET_RUN_ID = src.WORKLET_RUN_ID
AND     tgt.INSTANCE_ID = src.INSTANCE_ID
AND     tgt.TASK_ID = src.TASK_ID
AND     tgt.START_TIME = src.START_TIME
 SET
        tgt.END_TIME = src.END_TIME
,       tgt.RUN_ERR_CODE = src.RUN_ERR_CODE
,       tgt.RUN_ERR_MSG = src.RUN_ERR_MSG
,       tgt.RUN_STATUS_CODE = src.RUN_STATUS_CODE;

更新插入:

必须更改插入语句,请查看 JOIN 是列 等于 的地方,我们只 select 目标中没有匹配值的地方 table 检查目标 table 列是否为 NULL:

INSERT INTO schema.INFA_TASK_RUN (
     SUBJECT_AREA
    ,WORKFLOW_NAME
    ,VERSION_NUMBER
    ,SUBJECT_ID
    ,WORKFLOW_ID
    ,WORKFLOW_RUN_ID
    ,WORKLET_RUN_ID
    ,CHILD_RUN_ID
    ,INSTANCE_ID
    ,INSTANCE_NAME
    ,TASK_ID
    ,TASK_TYPE_NAME
    ,TASK_TYPE
    ,START_TIME
    ,END_TIME
    ,RUN_ERR_CODE
    ,RUN_ERR_MSG
    ,RUN_STATUS_CODE
    ,TASK_NAME
    ,TASK_VERSION_NUMBER
    ,SERVER_ID
    ,SERVER_NAME
    )
    select src.SUBJECT_AREA
    ,src.WORKFLOW_NAME
    ,src.VERSION_NUMBER
    ,src.SUBJECT_ID
    ,src.WORKFLOW_ID
    ,src.WORKFLOW_RUN_ID
    ,src.WORKLET_RUN_ID
    ,src.CHILD_RUN_ID
    ,src.INSTANCE_ID
    ,src.INSTANCE_NAME
    ,src.TASK_ID
    ,src.TASK_TYPE_NAME
    ,src.TASK_TYPE
    ,src.START_TIME
    ,src.END_TIME
    ,src.RUN_ERR_CODE
    ,src.RUN_ERR_MSG
    ,src.RUN_STATUS_CODE
    ,src.TASK_NAME
    ,src.TASK_VERSION_NUMBER
    ,src.SERVER_ID
    ,src.SERVER_NAME
    FROM schema.INFA_TASK_RUN as tgt
        RIGHT JOIN schema.INFA_TASK_RUN_STG as src  ON
       tgt.SUBJECT_ID = src.SUBJECT_ID
AND     tgt.WORKFLOW_ID = src.WORKFLOW_ID
AND     tgt.WORKFLOW_RUN_ID = src.WORKFLOW_RUN_ID
AND     tgt.WORKLET_RUN_ID = src.WORKLET_RUN_ID
AND     tgt.INSTANCE_ID = src.INSTANCE_ID
AND     tgt.TASK_ID = src.TASK_ID
AND     tgt.START_TIME = src.START_TIME
WHERE tgt.SUBJECT_ID IS NULL;

正确答案以避免混淆

 INSERT INTO schema.INFA_TASK_RUN (
         SUBJECT_AREA
        ,WORKFLOW_NAME
        ,VERSION_NUMBER
        ,SUBJECT_ID
        ,WORKFLOW_ID
        ,WORKFLOW_RUN_ID
        ,WORKLET_RUN_ID
        ,CHILD_RUN_ID
        ,INSTANCE_ID
        ,INSTANCE_NAME
        ,TASK_ID
        ,TASK_TYPE_NAME
        ,TASK_TYPE
        ,START_TIME
        ,END_TIME
        ,RUN_ERR_CODE
        ,RUN_ERR_MSG
        ,RUN_STATUS_CODE
        ,TASK_NAME
        ,TASK_VERSION_NUMBER
        ,SERVER_ID
        ,SERVER_NAME
        )
    SELECT
         SUBJECT_AREA
        ,WORKFLOW_NAME
        ,VERSION_NUMBER
        ,SUBJECT_ID
        ,WORKFLOW_ID
        ,WORKFLOW_RUN_ID
        ,WORKLET_RUN_ID
        ,CHILD_RUN_ID
        ,INSTANCE_ID
        ,INSTANCE_NAME
        ,TASK_ID
        ,TASK_TYPE_NAME
        ,TASK_TYPE
        ,START_TIME
        ,END_TIME
        ,RUN_ERR_CODE
        ,RUN_ERR_MSG
        ,RUN_STATUS_CODE
        ,TASK_NAME
        ,TASK_VERSION_NUMBER
        ,SERVER_ID
        ,SERVER_NAME
    FROM schema.INFA_TASK_RUN_STG src
    ON DUPLICATE KEY UPDATE
         END_TIME = src.END_TIME
        ,RUN_ERR_CODE = src.RUN_ERR_CODE
        ,RUN_ERR_MSG = src.RUN_ERR_MSG
        ,RUN_STATUS_CODE = src.RUN_STATUS_CODE;