为什么我的雪花流数据没有被刷新

Why my snowflake streams data is not getting flushed

我正在尝试使用 aws lambda(雪花连接器库)读取雪花流数据并将数据写入 RDS SQL 服务器。在 lambda 运行 之后,我的流数据没有被删除。

我不想从流中读取数据并将其插入到临时雪花 table 中,然后再次读取以将数据插入到 SQL 服务器中。有没有更好的方法来做到这一点?

Lambda 代码:

for table in table_list:
        sql5 = f"""SELECT "header__stream_position","header__timestamp" FROM STREAM_{table} where "header__operation" in ('UPDATE' ,'INSERT' ,'DELETE') ;"""
        result =cs.execute(sql5).fetchall()

        rds_columns = [(c[0],c[1],table[:-4]) for c in result]

        
        if rds_columns:
            
            cursor.fast_executemany = True
    
            sql6 = f"INSERT INTO {RDS_TABLE}(LSNNUMBER,TRANSACTIONTIME,TABLENAME) VALUES (?, ?, ?);"
        
            data = (rds_columns)
    
            cursor.executemany(sql6,data)
            
            table_write.append(table)

        
            conn.commit()
        
        ctx.commit()
        

Snowflake Streams 需要成功提交 DML 操作才能推进 Stream,因此您无法避免使用 Streams 的中间 Snowflake table(瞬态或其他)。

如果您可以在应用程序代码中管理 time/query 偏移量,则可以使用 Changes 获取相同的更改信息。

Stream 上的偏移量只有在被 DML 语句使用时才会增加。 (插入、更新、合并)。有一个名为 CHANGES 的 read-only 版本的流。但是,您必须自己跟踪偏移量。

https://docs.snowflake.com/en/sql-reference/constructs/changes.html