Redshift COPY 操作在 SQLAlchemy 中不起作用

Redshift COPY operation doesn't work in SQLAlchemy

我正在尝试在 SQLAlchemy 中进行 Redshift COPY。

当我在 psql 中执行时,以下 SQL 正确地将对象从我的 S3 存储桶复制到我的 Redshift table 中:

COPY posts FROM 's3://mybucket/the/key/prefix' 
WITH CREDENTIALS 'aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' 
JSON AS 'auto';

我有几个名为

的文件
s3://mybucket/the/key/prefix.001.json
s3://mybucket/the/key/prefix.002.json   
etc.

我可以用 select count(*) from posts 验证新行是否已添加到 table。

然而,当我在 SQLAlchemy 中执行完全相同的 SQL 表达式时,执行没有错误地完成,但是没有行被添加到我的 table.

session = get_redshift_session()
session.bind.execute("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey'    JSON AS 'auto';")
session.commit()

不管我做上面的还是

from sqlalchemy.sql import text 
session = get_redshift_session()
session.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey'    JSON AS 'auto';"))
session.commit()

我已成功使用核心表达式语言和 Connection.execute()(与 ORM 和会话相反)使用以下代码将分隔文件复制到 Redshift。也许您可以将其改编为 JSON.

def copy_s3_to_redshift(conn, s3path, table, aws_access_key, aws_secret_key, delim='\t', uncompress='auto', ignoreheader=None):
    """Copy a TSV file from S3 into redshift.

    Note the CSV option is not used, so quotes and escapes are ignored.  Empty fields are loaded as null.
    Does not commit a transaction.
    :param Connection conn: SQLAlchemy Connection
    :param str uncompress: None, 'gzip', 'lzop', or 'auto' to autodetect from `s3path` extension.
    :param int ignoreheader: Ignore this many initial rows.
    :return: Whatever a copy command returns.
    """
    if uncompress == 'auto':
        uncompress = 'gzip' if s3path.endswith('.gz') else 'lzop' if s3path.endswith('.lzo') else None

    copy = text("""
        copy "{table}"
        from :s3path
        credentials 'aws_access_key_id={aws_access_key};aws_secret_access_key={aws_secret_key}'
        delimiter :delim
        emptyasnull
        ignoreheader :ignoreheader
        compupdate on
        comprows 1000000
        {uncompress};
        """.format(uncompress=uncompress or '', table=text(table), aws_access_key=aws_access_key, aws_secret_key=aws_secret_key))    # copy command doesn't like table name or keys single-quoted
    return conn.execute(copy, s3path=s3path, delim=delim, ignoreheader=ignoreheader or 0)

我基本上遇到了同样的问题,但在我的情况下更多:

engine = create_engine('...')
engine.execute(text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey'    JSON AS 'auto';"))

通过单步执行 pdb,问题显然是缺少 .commit() 被调用。我不知道为什么 session.commit() 在你的情况下不起作用(也许是发送命令的会话 "lost track"?)所以它可能无法真正解决你的问题。

无论如何,作为explained in the sqlalchemy docs

Given this requirement, SQLAlchemy implements its own “autocommit” feature which works completely consistently across all backends. This is achieved by detecting statements which represent data-changing operations, i.e. INSERT, UPDATE, DELETE [...] If the statement is a text-only statement and the flag is not set, a regular expression is used to detect INSERT, UPDATE, DELETE, as well as a variety of other commands for a particular backend.

所以,有 2 个解决方案,要么:

  • text("COPY posts FROM 's3://mybucket/the/key/prefix' WITH CREDENTIALS aws_access_key_id=myaccesskey;aws_secret_access_key=mysecretaccesskey' JSON AS 'auto';").execution_options(autocommit=True).
  • 或者,获取固定版本的 redshift 方言...我只是 opened a PR 关于它

在为我工作的副本末尾添加一个提交:

<your copy sql>;commit;