使用 psycopg2 的服务器往返次数过多
Too many server roundtrips w/ psycopg2
我正在制作一个脚本,它应该为每个客户创建一个模式。我正在从定义每个客户的模式应该是什么样子的数据库中获取所有元数据,然后创建它。一切都已明确定义,table 的类型、名称等。一个客户有许多 table(外汇、地址、客户、联系人、项目等),每个 table具有相同的元数据。
我现在的程序:
- 从元数据数据库中获取我需要的一切。
- 在 for 循环中,创建一个 table,然后更改 Table 并添加每个元数据(这是为每个 table 完成的)。
现在我的脚本为每个客户运行大约一分钟,我认为这太慢了。这与我有一个循环有关,在那个循环中,我正在改变每个 table。
我认为我不应该改变(这可能不是那么聪明的方法),我应该做如下事情:
请注意,这只是一个愚蠢但有效的示例:
for table in tables:
con.execute("CREATE TABLE IF NOT EXISTS tester.%s (%s, %s);", (table, "last_seen date", "valid_from timestamp"))
但它给了我这个错误(它似乎将 table 名称读取为字符串中的字符串..):
psycopg2.errors.SyntaxError:“'billing'”处或附近的语法错误
第 1 行:创建 TABLE 如果不存在测试器。'billing' ('last_seen da...
However, this becomes heavy as there are too many server roundtrips.
您创建了多少个表和多少列,这很慢吗?你可以通过 ssh 连接到离你的服务器更近的机器并且 运行 那里的 python 吗?
我没有收到那个错误。相反,我收到 SQL 语法错误。值列表用于传送数据。但是 ALTER TABLE 与数据无关,它与元数据有关。您不能在那里使用值列表。您需要用双引号(或不带引号)而不是单引号括起列和类型的名称。名称和类型之间不能有逗号。而且你不能在每一对周围加上括号。而且每一对都需要引入"ADD",不能只拥有一次。您正在使用错误的工具来完成这项工作。 execute_batch 几乎是正确的工具,除了它将在标识符周围使用单引号而不是双引号。也许您可以向它添加一个标志,告诉它使用 quote_ident.
不仅 execute_values 不适合这项工作,而且我认为 python 通常也是如此。为什么不直接从 .sql
文件加载?
考虑使用 serial
类型(即自动编号)ID 字段创建 table,然后通过使用 组合对所有其他字段使用更改 table 的 sql.Identifier
用于标识符(架构名称、table 名称、列名称、函数名称等)和数据类型的常规格式,这些数据类型在 SQL 语句中不是文字.
from psycopg2 import sql
# CREATE TABLE
query = """CREATE TABLE IF NOT EXISTS {shm}.{tbl} (ID serial)"""
cur.execute(sql.SQL(query).format(shm = sql.Identifier("tester"),
tbl = sql.Identifier("table")))
# ALTER TABLE
items = [("last_seen", "date"), ("valid_from", "timestamp")]
query = """ALTER TABLE {shm}.{tbl} ADD COLUMN {col} {typ}"""
for item in items:
# KEEP IDENTIFIER PLACEHOLDERS
final_query = query.format(shm="{shm}", tbl="{tbl}", col="{col}", typ=i[1])
cur.execute(sql.SQL(final_query).format(shm = sql.Identifier("tester"),
tbl = sql.Identifier("table"),
col = sql.Identifier(item[0]))
或者,使用 str.join
和列表推导式 CREATE TABLE
:
query = """CREATE TABLE IF NOT EXISTS {shm}.{tbl} (
"id" serial,
{vals}
)"""
items = [("last_seen", "date"), ("valid_from", "timestamp")]
val = ",\n ".join(["{{}} {typ}".format(typ=i[1]) for i in items])
# KEEP IDENTIFIER PLACEHOLDERS
pre_query = query.format(shm="{shm}", tbl="{tbl}", vals=val)
final_query = sql.SQL(pre_query).format(*[sql.Identifier(i[0]) for i in items],
shm = sql.Identifier("tester"),
tbl = sql.Identifier("table"))
cur.execute(final_query)
SQL (发送到数据库)
CREATE TABLE IF NOT EXISTS "tester"."table" (
"id" serial,
"last_seen" date,
"valid_from" timestamp
)
我正在制作一个脚本,它应该为每个客户创建一个模式。我正在从定义每个客户的模式应该是什么样子的数据库中获取所有元数据,然后创建它。一切都已明确定义,table 的类型、名称等。一个客户有许多 table(外汇、地址、客户、联系人、项目等),每个 table具有相同的元数据。
我现在的程序:
- 从元数据数据库中获取我需要的一切。
- 在 for 循环中,创建一个 table,然后更改 Table 并添加每个元数据(这是为每个 table 完成的)。
现在我的脚本为每个客户运行大约一分钟,我认为这太慢了。这与我有一个循环有关,在那个循环中,我正在改变每个 table。 我认为我不应该改变(这可能不是那么聪明的方法),我应该做如下事情: 请注意,这只是一个愚蠢但有效的示例:
for table in tables:
con.execute("CREATE TABLE IF NOT EXISTS tester.%s (%s, %s);", (table, "last_seen date", "valid_from timestamp"))
但它给了我这个错误(它似乎将 table 名称读取为字符串中的字符串..):
psycopg2.errors.SyntaxError:“'billing'”处或附近的语法错误
第 1 行:创建 TABLE 如果不存在测试器。'billing' ('last_seen da...
However, this becomes heavy as there are too many server roundtrips.
您创建了多少个表和多少列,这很慢吗?你可以通过 ssh 连接到离你的服务器更近的机器并且 运行 那里的 python 吗?
我没有收到那个错误。相反,我收到 SQL 语法错误。值列表用于传送数据。但是 ALTER TABLE 与数据无关,它与元数据有关。您不能在那里使用值列表。您需要用双引号(或不带引号)而不是单引号括起列和类型的名称。名称和类型之间不能有逗号。而且你不能在每一对周围加上括号。而且每一对都需要引入"ADD",不能只拥有一次。您正在使用错误的工具来完成这项工作。 execute_batch 几乎是正确的工具,除了它将在标识符周围使用单引号而不是双引号。也许您可以向它添加一个标志,告诉它使用 quote_ident.
不仅 execute_values 不适合这项工作,而且我认为 python 通常也是如此。为什么不直接从 .sql
文件加载?
考虑使用 serial
类型(即自动编号)ID 字段创建 table,然后通过使用 组合对所有其他字段使用更改 table 的 sql.Identifier
用于标识符(架构名称、table 名称、列名称、函数名称等)和数据类型的常规格式,这些数据类型在 SQL 语句中不是文字.
from psycopg2 import sql
# CREATE TABLE
query = """CREATE TABLE IF NOT EXISTS {shm}.{tbl} (ID serial)"""
cur.execute(sql.SQL(query).format(shm = sql.Identifier("tester"),
tbl = sql.Identifier("table")))
# ALTER TABLE
items = [("last_seen", "date"), ("valid_from", "timestamp")]
query = """ALTER TABLE {shm}.{tbl} ADD COLUMN {col} {typ}"""
for item in items:
# KEEP IDENTIFIER PLACEHOLDERS
final_query = query.format(shm="{shm}", tbl="{tbl}", col="{col}", typ=i[1])
cur.execute(sql.SQL(final_query).format(shm = sql.Identifier("tester"),
tbl = sql.Identifier("table"),
col = sql.Identifier(item[0]))
或者,使用 str.join
和列表推导式 CREATE TABLE
:
query = """CREATE TABLE IF NOT EXISTS {shm}.{tbl} (
"id" serial,
{vals}
)"""
items = [("last_seen", "date"), ("valid_from", "timestamp")]
val = ",\n ".join(["{{}} {typ}".format(typ=i[1]) for i in items])
# KEEP IDENTIFIER PLACEHOLDERS
pre_query = query.format(shm="{shm}", tbl="{tbl}", vals=val)
final_query = sql.SQL(pre_query).format(*[sql.Identifier(i[0]) for i in items],
shm = sql.Identifier("tester"),
tbl = sql.Identifier("table"))
cur.execute(final_query)
SQL (发送到数据库)
CREATE TABLE IF NOT EXISTS "tester"."table" (
"id" serial,
"last_seen" date,
"valid_from" timestamp
)