Snowflake 存储过程中每个 DML 操作的审计或行数
Auditing or Rowcount for each DML Operation in Snowflake Stored Procedure
我想在合并和插入语句的存储过程中捕获审计,我尝试使用 Result_Scan 通过会话从 QUERY_HISTORY 获取查询 ID。但这些语句在 JavaScript 过程中是不允许的。然后我创建了一个函数来获取最近执行的 SQL 查询的查询 ID。
create or replace function GET_QUERY_ID()
RETURNS VARCHAR
AS 'SELECT QUID FROM (SELECT (QUERY_ID)::VARCHAR AS QUID FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY_BY_SESSION(CURRENT_SESSION()::NUMBER)) WHERE QUERY_TYPE IN (''INSERT'',''MERGE'') ORDER BY END_TIME DESC ) LIMIT 1';
并尝试为行计数创建另一个函数
create or replace function GET_RESULT_SCAN(P_QUERY_ID VARCHAR)
RETURNS TABLE ( INSERT_ROWS NUMBER ,UPDATED_ROWS NUMBER)
AS 'select * from (select "number of rows updated"::NUMBER as INSERT_ROWS, "number of multi-joined rows updated"::NUMBER as UPDATED_ROWS from table(result_scan(P_QUERY_ID)))';
但这不起作用,我无法在使用 JavaScript 创建的存储过程中调用这些函数。
请让我知道捕获对存储过程中插入和更新的行数的审计的最佳实践。假设在存储过程中有 5-10 SQL 个语句。
尝试在存储过程定义的顶部添加行 execute as caller
:
CREATE [ OR REPLACE ] PROCEDURE <name> ( [ <arg_name> <arg_data_type> ] [ , ... ] )
RETURNS <result_data_type> [ NOT NULL ]
LANGUAGE JAVASCRIPT
[ { CALLED ON NULL INPUT | { RETURNS NULL ON NULL INPUT | STRICT } } ]
[ VOLATILE | IMMUTABLE ]
[ COMMENT = '<string_literal>' ]
[ EXECUTE AS { CALLER | OWNER } ]
AS '<procedure_definition>'
这是一个简单的程序,它执行合并并捕获 & returns 插入的行数和更新的行数:
CREATE OR REPLACE PROCEDURE utl.arch_merge_sp(P_STAGE_TBL VARCHAR, P_FINAL_TBL VARCHAR)
RETURNS STRING
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS $$
var sqlCmd = "";
var sqlStmt = "";
var result = "";
try {
sqlCmd = `
MERGE INTO final_t F USING stage_t S
ON F.KEY_ID = S.KEY_ID
WHEN MATCHED THEN UPDATE SET
F.ATTR_NM = S.ATTR_NM
,F.ATTR_NBR = S.ATTR_NBR
WHEN NOT MATCHED THEN INSERT (
F.KEY_ID
,F.ATTR_NM
,F.ATTR_NBR
) VALUES (
S.KEY_ID
,S.ATTR_NM
,S.ATTR_NBR);
`;
sqlStmt = snowflake.createStatement( {sqlText: sqlCmd} );
rs = sqlStmt.execute();
sqlCmd =
`SELECT "number of rows inserted", "number of rows updated"
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))`;
sqlStmt = snowflake.createStatement( {sqlText: sqlCmd} );
rs = sqlStmt.execute();
rs.next();
result += "Rows inserted: " + rs.getColumnValue(1) + ", Rows updated: " + rs.getColumnValue(2)
}
catch (err) {
result = "Failed: Code: " + err.code + " | State: " + err.state;
result += "\n Message: " + err.message;
result += "\nStack Trace:\n" + err.stackTraceTxt;
}
return result;
$$;
为什么不使用雪花的 Statement 对象方法?
您还可以创建包装存储过程,这样就不必一直处理样板代码。
CREATE OR REPLACE PROCEDURE EXEC_SQL_SCRIPT (SQL_SCRIPT VARCHAR, STATEMENT_SEPARATOR VARCHAR)
RETURNS VARIANT
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$
var execReport = { status: "Succeeded",
statements : []
}
var stmtSep = ";"
if (STATEMENT_SEPARATOR != null) {
stmtSep = STATEMENT_SEPARATOR;
}
var stmts = SQL_SCRIPT.split(stmtSep);
for (var i = 0; i < stmts.length; i++) {
var stmtText = stmts[i].trim();
if (stmtText.length > 0) {
try {
var stmt = snowflake.createStatement( { sqlText : stmtText } );
var rs = stmt.execute();
execReport["statements"].push({ statement_no: i + 1,
status: "Succeeded",
rowCount: stmt.getRowCount(),
numRowsAffected: stmt.getNumRowsAffected(),
numRowsDeleted: stmt.getNumRowsDeleted(),
numRowsInserted: stmt.getNumRowsInserted(),
numRowsUpdated: stmt.getNumRowsUpdated(),
queryId : stmt.getQueryId(),
sqlText: stmt.getSqlText(),
error: null
});
} catch (err) {
var error = { code : err.code,
state : err.state,
message : err.message,
stackTraceTxt : err.stackTraceTxt
};
execReport["statements"].push({ statement_no: i + 1,
status: "Failed",
rowCount: null,
numRowsAffected: null,
numRowsDeleted: null,
numRowsInserted: null,
numRowsUpdated: null,
queryId : stmt.getQueryId(),
sqlText: stmt.getSqlText(),
error: error
});
execReport["status"] = "Failed";
return execReport;
}
}
}
return execReport;
$$;
我想在合并和插入语句的存储过程中捕获审计,我尝试使用 Result_Scan 通过会话从 QUERY_HISTORY 获取查询 ID。但这些语句在 JavaScript 过程中是不允许的。然后我创建了一个函数来获取最近执行的 SQL 查询的查询 ID。
create or replace function GET_QUERY_ID()
RETURNS VARCHAR
AS 'SELECT QUID FROM (SELECT (QUERY_ID)::VARCHAR AS QUID FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY_BY_SESSION(CURRENT_SESSION()::NUMBER)) WHERE QUERY_TYPE IN (''INSERT'',''MERGE'') ORDER BY END_TIME DESC ) LIMIT 1';
并尝试为行计数创建另一个函数
create or replace function GET_RESULT_SCAN(P_QUERY_ID VARCHAR)
RETURNS TABLE ( INSERT_ROWS NUMBER ,UPDATED_ROWS NUMBER)
AS 'select * from (select "number of rows updated"::NUMBER as INSERT_ROWS, "number of multi-joined rows updated"::NUMBER as UPDATED_ROWS from table(result_scan(P_QUERY_ID)))';
但这不起作用,我无法在使用 JavaScript 创建的存储过程中调用这些函数。 请让我知道捕获对存储过程中插入和更新的行数的审计的最佳实践。假设在存储过程中有 5-10 SQL 个语句。
尝试在存储过程定义的顶部添加行 execute as caller :
CREATE [ OR REPLACE ] PROCEDURE <name> ( [ <arg_name> <arg_data_type> ] [ , ... ] )
RETURNS <result_data_type> [ NOT NULL ]
LANGUAGE JAVASCRIPT
[ { CALLED ON NULL INPUT | { RETURNS NULL ON NULL INPUT | STRICT } } ]
[ VOLATILE | IMMUTABLE ]
[ COMMENT = '<string_literal>' ]
[ EXECUTE AS { CALLER | OWNER } ]
AS '<procedure_definition>'
这是一个简单的程序,它执行合并并捕获 & returns 插入的行数和更新的行数:
CREATE OR REPLACE PROCEDURE utl.arch_merge_sp(P_STAGE_TBL VARCHAR, P_FINAL_TBL VARCHAR)
RETURNS STRING
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS $$
var sqlCmd = "";
var sqlStmt = "";
var result = "";
try {
sqlCmd = `
MERGE INTO final_t F USING stage_t S
ON F.KEY_ID = S.KEY_ID
WHEN MATCHED THEN UPDATE SET
F.ATTR_NM = S.ATTR_NM
,F.ATTR_NBR = S.ATTR_NBR
WHEN NOT MATCHED THEN INSERT (
F.KEY_ID
,F.ATTR_NM
,F.ATTR_NBR
) VALUES (
S.KEY_ID
,S.ATTR_NM
,S.ATTR_NBR);
`;
sqlStmt = snowflake.createStatement( {sqlText: sqlCmd} );
rs = sqlStmt.execute();
sqlCmd =
`SELECT "number of rows inserted", "number of rows updated"
FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))`;
sqlStmt = snowflake.createStatement( {sqlText: sqlCmd} );
rs = sqlStmt.execute();
rs.next();
result += "Rows inserted: " + rs.getColumnValue(1) + ", Rows updated: " + rs.getColumnValue(2)
}
catch (err) {
result = "Failed: Code: " + err.code + " | State: " + err.state;
result += "\n Message: " + err.message;
result += "\nStack Trace:\n" + err.stackTraceTxt;
}
return result;
$$;
为什么不使用雪花的 Statement 对象方法?
您还可以创建包装存储过程,这样就不必一直处理样板代码。
CREATE OR REPLACE PROCEDURE EXEC_SQL_SCRIPT (SQL_SCRIPT VARCHAR, STATEMENT_SEPARATOR VARCHAR)
RETURNS VARIANT
LANGUAGE JAVASCRIPT
EXECUTE AS CALLER
AS
$$
var execReport = { status: "Succeeded",
statements : []
}
var stmtSep = ";"
if (STATEMENT_SEPARATOR != null) {
stmtSep = STATEMENT_SEPARATOR;
}
var stmts = SQL_SCRIPT.split(stmtSep);
for (var i = 0; i < stmts.length; i++) {
var stmtText = stmts[i].trim();
if (stmtText.length > 0) {
try {
var stmt = snowflake.createStatement( { sqlText : stmtText } );
var rs = stmt.execute();
execReport["statements"].push({ statement_no: i + 1,
status: "Succeeded",
rowCount: stmt.getRowCount(),
numRowsAffected: stmt.getNumRowsAffected(),
numRowsDeleted: stmt.getNumRowsDeleted(),
numRowsInserted: stmt.getNumRowsInserted(),
numRowsUpdated: stmt.getNumRowsUpdated(),
queryId : stmt.getQueryId(),
sqlText: stmt.getSqlText(),
error: null
});
} catch (err) {
var error = { code : err.code,
state : err.state,
message : err.message,
stackTraceTxt : err.stackTraceTxt
};
execReport["statements"].push({ statement_no: i + 1,
status: "Failed",
rowCount: null,
numRowsAffected: null,
numRowsDeleted: null,
numRowsInserted: null,
numRowsUpdated: null,
queryId : stmt.getQueryId(),
sqlText: stmt.getSqlText(),
error: error
});
execReport["status"] = "Failed";
return execReport;
}
}
}
return execReport;
$$;