Snowflake 存储过程中每个 DML 操作的审计或行数

Auditing or Rowcount for each DML Operation in Snowflake Stored Procedure

我想在合并和插入语句的存储过程中捕获审计,我尝试使用 Result_Scan 通过会话从 QUERY_HISTORY 获取查询 ID。但这些语句在 JavaScript 过程中是不允许的。然后我创建了一个函数来获取最近执行的 SQL 查询的查询 ID。

create or replace function GET_QUERY_ID()
   RETURNS VARCHAR
   AS 'SELECT QUID FROM (SELECT (QUERY_ID)::VARCHAR AS QUID FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY_BY_SESSION(CURRENT_SESSION()::NUMBER)) WHERE QUERY_TYPE IN (''INSERT'',''MERGE'') ORDER BY END_TIME DESC ) LIMIT 1';

并尝试为行计数创建另一个函数

create or replace function GET_RESULT_SCAN(P_QUERY_ID VARCHAR)
   RETURNS TABLE ( INSERT_ROWS NUMBER ,UPDATED_ROWS NUMBER)
   AS 'select * from (select "number of rows updated"::NUMBER as INSERT_ROWS, "number of multi-joined rows updated"::NUMBER as UPDATED_ROWS from table(result_scan(P_QUERY_ID)))';

但这不起作用,我无法在使用 JavaScript 创建的存储过程中调用这些函数。 请让我知道捕获对存储过程中插入和更新的行数的审计的最佳实践。假设在存储过程中有 5-10 SQL 个语句。

尝试在存储过程定义的顶部添加行 execute as caller :

CREATE [ OR REPLACE ] PROCEDURE <name> ( [ <arg_name> <arg_data_type> ] [ , ... ] )
  RETURNS <result_data_type> [ NOT NULL ]
  LANGUAGE JAVASCRIPT
  [ { CALLED ON NULL INPUT | { RETURNS NULL ON NULL INPUT | STRICT } } ]
  [ VOLATILE | IMMUTABLE ]
  [ COMMENT = '<string_literal>' ]
  [ EXECUTE AS { CALLER | OWNER } ]
  AS '<procedure_definition>'

这是一个简单的程序,它执行合并并捕获 & returns 插入的行数和更新的行数:

CREATE OR REPLACE PROCEDURE utl.arch_merge_sp(P_STAGE_TBL VARCHAR, P_FINAL_TBL VARCHAR)
  RETURNS STRING
  LANGUAGE JAVASCRIPT
  EXECUTE AS CALLER
AS $$
  var sqlCmd = "";
  var sqlStmt = "";
  var result = "";

  try {
    sqlCmd = `
      MERGE INTO final_t F USING stage_t S
      ON F.KEY_ID = S.KEY_ID
      WHEN MATCHED THEN UPDATE SET
       F.ATTR_NM = S.ATTR_NM
      ,F.ATTR_NBR = S.ATTR_NBR
      WHEN NOT MATCHED THEN INSERT (
       F.KEY_ID
      ,F.ATTR_NM
      ,F.ATTR_NBR
      ) VALUES (
       S.KEY_ID
      ,S.ATTR_NM
      ,S.ATTR_NBR);
      `;
    sqlStmt = snowflake.createStatement( {sqlText: sqlCmd} );
    rs = sqlStmt.execute();

    sqlCmd = 
      `SELECT "number of rows inserted", "number of rows updated"
        FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))`;
    sqlStmt = snowflake.createStatement( {sqlText: sqlCmd} );
    rs = sqlStmt.execute();
    rs.next();

    result += "Rows inserted: " + rs.getColumnValue(1) + ", Rows updated: " + rs.getColumnValue(2)

  }
  catch (err) {
    result =  "Failed: Code: " + err.code + " | State: " + err.state;
    result += "\n  Message: " + err.message;
    result += "\nStack Trace:\n" + err.stackTraceTxt; 
    }

  return result;
$$;

为什么不使用雪花的 Statement 对象方法?

您还可以创建包装存储过程,这样就不必一直处理样板代码。

CREATE OR REPLACE PROCEDURE EXEC_SQL_SCRIPT (SQL_SCRIPT VARCHAR, STATEMENT_SEPARATOR VARCHAR) 
RETURNS VARIANT
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$

var execReport = { status: "Succeeded",
                   statements : [] 
}

var stmtSep = ";"
if (STATEMENT_SEPARATOR != null) {
    stmtSep = STATEMENT_SEPARATOR;
}

var stmts = SQL_SCRIPT.split(stmtSep);

for (var i = 0; i < stmts.length; i++) {
    var stmtText = stmts[i].trim();
    
    if (stmtText.length > 0) {
        try {
            var stmt = snowflake.createStatement( { sqlText : stmtText } );
            var rs = stmt.execute();

            execReport["statements"].push({ statement_no: i + 1, 
                                            status: "Succeeded",
                                            rowCount: stmt.getRowCount(),
                                            numRowsAffected: stmt.getNumRowsAffected(),
                                            numRowsDeleted: stmt.getNumRowsDeleted(),
                                            numRowsInserted: stmt.getNumRowsInserted(),
                                            numRowsUpdated: stmt.getNumRowsUpdated(),
                                            queryId : stmt.getQueryId(),
                                            sqlText: stmt.getSqlText(),
                                            error: null
                                        });
        } catch (err)  {
            var error = { code : err.code,
                          state : err.state,
                          message : err.message,
                          stackTraceTxt : err.stackTraceTxt
            };

            execReport["statements"].push({ statement_no: i + 1, 
                                            status: "Failed",
                                            rowCount: null,
                                            numRowsAffected: null,
                                            numRowsDeleted: null,
                                            numRowsInserted: null,
                                            numRowsUpdated: null,
                                            queryId : stmt.getQueryId(),
                                            sqlText: stmt.getSqlText(),
                                            error: error
                                        });

            execReport["status"] = "Failed";

            return execReport;  
        } 
    }
}

return execReport;

$$;