为什么我的雪花流数据没有被刷新
Why my snowflake streams data is not getting flushed
我正在尝试使用 aws lambda(雪花连接器库)读取雪花流数据并将数据写入 RDS SQL 服务器。在 lambda 运行 之后,我的流数据没有被删除。
我不想从流中读取数据并将其插入到临时雪花 table 中,然后再次读取以将数据插入到 SQL 服务器中。有没有更好的方法来做到这一点?
Lambda 代码:
for table in table_list:
sql5 = f"""SELECT "header__stream_position","header__timestamp" FROM STREAM_{table} where "header__operation" in ('UPDATE' ,'INSERT' ,'DELETE') ;"""
result =cs.execute(sql5).fetchall()
rds_columns = [(c[0],c[1],table[:-4]) for c in result]
if rds_columns:
cursor.fast_executemany = True
sql6 = f"INSERT INTO {RDS_TABLE}(LSNNUMBER,TRANSACTIONTIME,TABLENAME) VALUES (?, ?, ?);"
data = (rds_columns)
cursor.executemany(sql6,data)
table_write.append(table)
conn.commit()
ctx.commit()
Snowflake Streams 需要成功提交 DML 操作才能推进 Stream,因此您无法避免使用 Streams 的中间 Snowflake table(瞬态或其他)。
如果您可以在应用程序代码中管理 time/query 偏移量,则可以使用 Changes 获取相同的更改信息。
Stream 上的偏移量只有在被 DML 语句使用时才会增加。 (插入、更新、合并)。有一个名为 CHANGES 的 read-only 版本的流。但是,您必须自己跟踪偏移量。
https://docs.snowflake.com/en/sql-reference/constructs/changes.html
我正在尝试使用 aws lambda(雪花连接器库)读取雪花流数据并将数据写入 RDS SQL 服务器。在 lambda 运行 之后,我的流数据没有被删除。
我不想从流中读取数据并将其插入到临时雪花 table 中,然后再次读取以将数据插入到 SQL 服务器中。有没有更好的方法来做到这一点?
Lambda 代码:
for table in table_list:
sql5 = f"""SELECT "header__stream_position","header__timestamp" FROM STREAM_{table} where "header__operation" in ('UPDATE' ,'INSERT' ,'DELETE') ;"""
result =cs.execute(sql5).fetchall()
rds_columns = [(c[0],c[1],table[:-4]) for c in result]
if rds_columns:
cursor.fast_executemany = True
sql6 = f"INSERT INTO {RDS_TABLE}(LSNNUMBER,TRANSACTIONTIME,TABLENAME) VALUES (?, ?, ?);"
data = (rds_columns)
cursor.executemany(sql6,data)
table_write.append(table)
conn.commit()
ctx.commit()
Snowflake Streams 需要成功提交 DML 操作才能推进 Stream,因此您无法避免使用 Streams 的中间 Snowflake table(瞬态或其他)。
如果您可以在应用程序代码中管理 time/query 偏移量,则可以使用 Changes 获取相同的更改信息。
Stream 上的偏移量只有在被 DML 语句使用时才会增加。 (插入、更新、合并)。有一个名为 CHANGES 的 read-only 版本的流。但是,您必须自己跟踪偏移量。
https://docs.snowflake.com/en/sql-reference/constructs/changes.html