使用 concurrent.futures 插入 postgres
Inserting into postgres using concurrent.futures
我正在尝试使用多线程将 raw_html 插入到 postgres 中,但我无法获取要插入的数据并继续获取
LINE 1: INSERT INTO raw_html VALUES b'<!DOCTYPE html><html lang="en"...
^
Error: syntax error at or near "b'<!DOCTYPE html
我无法弄清楚 b
在这些字符串中的每一个前面做了什么(我也无法在网上找到任何解释 b 是什么的东西)并且我假设这就是插入失败的原因.我还试图让 raw_html 字符串转换为一个元组,因为我认为它需要是一个元组才能让 sql 接受它。我不知道接下来要尝试什么来解决这两个问题。任何方向赞赏:
更新:
def execute_values(LST):
DB_HOST = "localhost"
DB_NAME = "yellow_pages_scraper"
DB_USER = "justinbenfit"
DB_PASS = "postgres"
DB_URL = "mydburl"
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST)
cur = conn.cursor()
try:
LST = tuple(LST)
for L in LST:
cur.execute("INSERT INTO raw_html VALUES (?)", (L.decode('utf-8'),))
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
return 1
print(f"the dataframe is inserted")
def main(LINKS):
t0 = time.time()
download_raw(LINKS)
execute_values(LST)
t1 = time.time()
print(f"{t1-t0} seconds to download {len(LST)} raw_html.")
有几个问题。首先,您发送 bytes
字符串而不是 Unicode 字符串。其次,你没有做任何引用。第三,您的 INSERT 语法错误;这些值需要放在括号中。第四,你应该允许数据库为你做报价,以防止SQL arracks。
这应该有效:
cur.execute("INSERT INTO raw_html VALUES (?)", (L.decode('utf-8'),)
但是,你的整个概念是有缺陷的。您让每个线程打开一个新的数据库连接。那太糟了。数据库插入不是你的瓶颈。您应该能够连续执行此操作。只需延迟 commit
直到整套完成。
我正在尝试使用多线程将 raw_html 插入到 postgres 中,但我无法获取要插入的数据并继续获取
LINE 1: INSERT INTO raw_html VALUES b'<!DOCTYPE html><html lang="en"...
^
Error: syntax error at or near "b'<!DOCTYPE html
我无法弄清楚 b
在这些字符串中的每一个前面做了什么(我也无法在网上找到任何解释 b 是什么的东西)并且我假设这就是插入失败的原因.我还试图让 raw_html 字符串转换为一个元组,因为我认为它需要是一个元组才能让 sql 接受它。我不知道接下来要尝试什么来解决这两个问题。任何方向赞赏:
更新:
def execute_values(LST):
DB_HOST = "localhost"
DB_NAME = "yellow_pages_scraper"
DB_USER = "justinbenfit"
DB_PASS = "postgres"
DB_URL = "mydburl"
conn = psycopg2.connect(dbname=DB_NAME, user=DB_USER, password=DB_PASS, host=DB_HOST)
cur = conn.cursor()
try:
LST = tuple(LST)
for L in LST:
cur.execute("INSERT INTO raw_html VALUES (?)", (L.decode('utf-8'),))
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
return 1
print(f"the dataframe is inserted")
def main(LINKS):
t0 = time.time()
download_raw(LINKS)
execute_values(LST)
t1 = time.time()
print(f"{t1-t0} seconds to download {len(LST)} raw_html.")
有几个问题。首先,您发送 bytes
字符串而不是 Unicode 字符串。其次,你没有做任何引用。第三,您的 INSERT 语法错误;这些值需要放在括号中。第四,你应该允许数据库为你做报价,以防止SQL arracks。
这应该有效:
cur.execute("INSERT INTO raw_html VALUES (?)", (L.decode('utf-8'),)
但是,你的整个概念是有缺陷的。您让每个线程打开一个新的数据库连接。那太糟了。数据库插入不是你的瓶颈。您应该能够连续执行此操作。只需延迟 commit
直到整套完成。