将 SQL 查询的结果写入 CSV 并避免额外的换行符

Writing results from SQL query to CSV and avoiding extra line-breaks

我必须从几个不同的数据库引擎中提取数据。导出此数据后,我将数据发送到 AWS S3 并使用 COPY 命令将该数据复制到 Redshift。一些表格包含大量文本,列字段中存在换行符和其他字符。当我运行下面的代码:

cursor.execute('''SELECT * FROM some_schema.some_message_log''')
rows = cursor.fetchall()
with open('data.csv', 'w', newline='') as fp:
    a = csv.writer(fp, delimiter='|', quoting=csv.QUOTE_ALL, quotechar='"', doublequote=True, lineterminator='\n')
    a.writerows(rows)

一些带有回车符 returns/linebreaks 的列将创建新行:

"2017-01-05 17:06:32.802700"|"SampleJob"|""|"Date"|"error"|"Job.py"|"syntax error at or near ""from"" LINE 34: select *, SYSDATE, from staging_tops.tkabsences;
                                      ^
-<class 'psycopg2.ProgrammingError'>"

导致导入过程失败。我可以通过对异常进行硬编码来解决这个问题:

cursor.execute('''SELECT * FROM some_schema.some_message_log''')
rows = cursor.fetchall()
with open('data.csv', 'w', newline='') as fp:
    a = csv.writer(fp, delimiter='|', quoting=csv.QUOTE_ALL, quotechar='"', doublequote=True, lineterminator='\n')

for row in rows:
    list_of_rows = []
    for c in row:
        if isinstance(c, str):
            c = c.replace("\n", "\n")
            c = c.replace("|", "\|")
            c = c.replace("\", "\\")
            list_of_rows.append(c)
        else:
            list_of_rows.append(c)
    a.writerow([x.encode('utf-8') if isinstance(x, str) else x for x in list_of_rows])

但这需要很长时间来处理较大的文件,并且通常看起来是不好的做法。有没有更快的方法将数据从 SQL 游标导出到 CSV,当遇到包含回车 returns/line 分隔符的文本列时不会中断?

如果您在没有 WHERE 子句的情况下执行 SELECT * FROM table,您可以使用 COPY table TO STDOUT 代替,并使用正确的选项:

copy_command = """COPY some_schema.some_message_log TO STDOUT
        CSV QUOTE '"' DELIMITER '|' FORCE QUOTE *"""

with open('data.csv', 'w', newline='') as fp:
    cursor.copy_expert(copy_command)

在我的测试中,这会导致文字“\n”而不是实际的换行符,其中通过 csv 编写器写入会给出断行。

如果您在生产中确实需要 WHERE 子句,您可以创建一个临时的 table 并复制它:

cursor.execute("""CREATE TEMPORARY TABLE copy_me AS
        SELECT this, that, the_other FROM table_name WHERE conditions""")

(edit) 再次查看您的问题,我看到您提到了 "ever all different database engines"。以上适用于 psyopg2 和 postgresql,但可能适用于其他数据库或库。

为什么在每一行之后写入数据库?

cursor.execute('''SELECT * FROM some_schema.some_message_log''')
rows = cursor.fetchall()
with open('data.csv', 'w', newline='') as fp:
    a = csv.writer(fp, delimiter='|', quoting=csv.QUOTE_ALL, quotechar='"', doublequote=True, lineterminator='\n')

list_of_rows = []
for row in rows:
    for c in row:
        if isinstance(c, basestring):
            c = c.replace("\n", "\n")
            c = c.replace("|", "\|")
            c = c.replace("\", "\\")
    list_of_rows.append(row)
a.writerows([x.encode('utf-8') if isinstance(x, str) else x for x in list_of_rows])

我怀疑问题很简单,只需确保 Python CSV 导出库和 Redshift 的 COPY 导入使用通用接口即可。简而言之,检查您的分隔符和引号字符并确保 Python 输出和 Redshift COPY 命令一致。

更详细一点:数据库驱动程序已经完成了以 well-understood 形式到达 Python 的艰苦工作。也就是说,数据库中的每一行都是一个列表(或元组、生成器等),并且每个单元格都可以单独访问。当你有一个 list-like 结构时,Python 的 CSV 导出器可以完成剩下的工作,而且——至关重要的是——Redshift 将能够从输出、嵌入的换行符和所有内容中复制. 特别是,您不需要进行任何手动转义; .writerow().writerows() 函数应该是您需要做的全部。

Redshift 的 COPY 实现默认理解最常见的 CSV 方言,即

  • 用逗号分隔单元格 (,),
  • 用双引号引用单元格 ("),
  • 并通过加倍转义任何嵌入的双引号 (""")。

使用 Redshift FORMAT AS CSV 中的文档来支持这一点:

... The default quote character is a double quotation mark ( " ). When the quote character is used within a field, escape the character with an additional quote character. ...

但是,您的 Python CSV 导出代码使用竖线 (|) 作为 delimiter 并将 quotechar 设置为双引号 (" ).这也可以,但为什么要偏离 the defaults?建议使用 CSV 的同名名称并使您的代码在此过程中更简单:

cursor.execute('''SELECT * FROM some_schema.some_message_log''')
rows = cursor.fetchall()
with open('data.csv', 'w') as fp:
    csvw = csv.writer( fp )
    csvw.writerows(rows)

从那里,告诉 COPY 使用 CSV 格式(同样不需要 non-default 规范):

COPY  your_table  FROM  your_csv_file  auth_code  FORMAT AS CSV;

应该可以了。

问题是您使用的是带有默认参数的 Redshift COPY 命令,它使用竖线作为分隔符(参见 here and here) and require escaping of newlines and pipes within text fields (see here and here)。但是,Python csv 编写器只知道如何使用嵌入式换行符执行标准操作,即将它们 as-is 留在引号字符串中。

幸运的是,Redshift COPY命令也可以使用标准的CSV格式。将 CSV 选项添加到 COPY 命令 gives you this behavior:

Enables use of CSV format in the input data. To automatically escape delimiters, newline characters, and carriage returns, enclose the field in the character specified by the QUOTE parameter. The default quote character is a double quotation mark ( " ). When the quote character is used within a field, escape the character with an additional quote character."

这正是 Python CSV 编写器使用的方法,因此它应该可以解决您的问题。所以我的建议是使用如下代码创建一个标准的 csv 文件:

cursor.execute('''SELECT * FROM some_schema.some_message_log''')
rows = cursor.fetchall()
with open('data.csv', 'w', newline='') as fp:
    a = csv.writer(fp)  # no need for special settings
    a.writerows(rows)

然后在 Redshift 中,将 COPY 命令更改为 like this(注意添加的 CSV 标记):

COPY logdata
FROM 's3://mybucket/data/data.csv' 
iam_role 'arn:aws:iam::0123456789012:role/MyRedshiftRole' 
CSV;

或者,您可以继续手动转换您的字段以匹配 Redshift 的 COPY 命令的默认设置。 Python 的 csv.writer 不会自己为您执行此操作,但您可以稍微加快代码速度,尤其是对于大文件,如下所示:

cursor.execute('''SELECT * FROM some_schema.some_message_log''')
rows = cursor.fetchall()
with open('data.csv', 'w', newline='') as fp:
    a = csv.writer(
        fp, 
        delimiter='|', quoting=csv.QUOTE_ALL, 
        quotechar='"', doublequote=True, lineterminator='\n'
    )
    a.writerows(
        c.replace("\", "\\").replace("\n", "\\n").replace("|", "\|").encode('utf-8')
        if isinstance(c, str)
        else c
        for row in rows
        for c in row
    )

作为另一种选择,您可以尝试使用 .from_sql 将查询数据导入到 pandas DataFrame 中,在 DataFrame 中进行替换(一次一整列),然后编写table 出局 .to_csv。 Pandas 具有非常快的 csv 代码,因此这可能会给您带来显着的加速。

更新: 我刚刚注意到最后我基本上重复了@hunteke 的回答。关键点(我第一次错过)是您可能没有在当前的 Redshift COPY 命令中使用 CSV 参数;如果你添加它,这应该很容易。