如何在pd.read_sql_query中一次执行多个SQL命令?
How to execute multiple SQL commands at once in pd.read_sql_query?
让我创建一个用例来讨论。
CREATE DATABASE sample;
USE sample;
CREATE TABLE quote (
`id` int(2) unsigned NOT NULL AUTO_INCREMENT,
`code` text ,
`date` date DEFAULT NULL,
`close` double DEFAULT NULL,
PRIMARY KEY (`id`)
) ;
INSERT INTO quote (`code`, `date`, `close`)
VALUES ('epm', '20200824', 2.64);
INSERT INTO quote (`code`, `date`, `close`)
VALUES ('dss', '20200824', 6.4);
用 sqlalchemy 只执行一个 sql 命令很简单。
import pandas as pd
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = '127.0.0.1'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))
cmd_one_line_sql = 'select * from sample.quote;'
df = pd.read_sql_query(cmd_one_line_sql,con = engine)
df
id code date close
0 1 epm 2020-08-24 2.64
1 2 dss 2020-08-24 6.40
我得到了想要的结果,现在cmd包含多个sql命令,为简单起见,它只包含两行
cmd_multi_lines_sql = 'use sample;select * from quote;'
cmd_multi_lines_sql
刚刚将 cmd_one_line_sql
拆分为两个。
我根据手册重写了代码片段:
execute many sql commands with sqlalchemy
import pandas as pd
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = '127.0.0.1'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))
connection = engine.raw_connection()
cmd_multi_lines_sql = 'use sample;select * from quote;'
try:
cursor = connection.cursor()
cursor.execute(cmd_multi_lines_sql)
results_one = cursor.fetchall()
finally:
connection.close()
获取以下错误信息:
Traceback (most recent call last):
File "<stdin>", line 3, in <module>
File "/usr/local/lib/python3.5/dist-packages/pymysql/cursors.py", line 170, in execute
result = self._query(query)
File "/usr/local/lib/python3.5/dist-packages/pymysql/cursors.py", line 328, in _query
conn.query(q)
File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 517, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 732, in _read_query_result
result.read()
File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 1075, in read
first_packet = self.connection._read_packet()
File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 684, in _read_packet
packet.check_error()
File "/usr/local/lib/python3.5/dist-packages/pymysql/protocol.py", line 220, in check_error
err.raise_mysql_exception(self._data)
File "/usr/local/lib/python3.5/dist-packages/pymysql/err.py", line 109, in raise_mysql_exception
raise errorclass(errno, errval)
pymysql.err.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'select * from quote' at line 1")
再试一次:
vim /tmp/test.sql
use sample;
select * from quote;
#write the commands in `/tmp/test.sql`
f = open('/tmp/test.sql','r')
cmd = f.read()
df = pd.read_sql_query(cmd, con = engine)
它输出相同的错误info.How要修复它?
您面临的问题是:
- 您需要将
MULTI_STATEMENTS
标志传递给 PyMySQL,并且
read_sql_query
假定第一个结果集包含 DataFrame 的数据,对于匿名代码块而言可能并非如此。
您可以创建自己的 PyMySQL 连接并像这样检索数据:
import pandas as pd
import pymysql
from pymysql.constants import CLIENT
conn_info = {
"host": "localhost",
"port": 3307,
"user": "root",
"password": "toot",
"database": "mydb",
"client_flag": CLIENT.MULTI_STATEMENTS,
}
cnxn = pymysql.connect(**conn_info)
crsr = cnxn.cursor()
sql = """\
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
"""
crsr.execute(sql)
num_tries = 5
result = None
for i in range(num_tries):
result = crsr.fetchall()
if result:
break
crsr.nextset()
if not result:
print(f"(no result found after {num_tries} attempts)")
else:
df = pd.DataFrame(result, columns=[x[0] for x in crsr.description])
print(df)
"""console output:
id txt
0 1 foo
1 2 ΟΠΑ!
"""
(编辑)补充说明:
注意1:如中所述,您可以使用SQLAlchemy的create_engine
方法的connect_args
参数来传递MULTI_STATEMENTS
标志。如果你需要一个 SQLAlchemy Engine
对象来做其他事情(例如,to_sql
)那么这可能比直接创建你自己的 PyMySQL 连接更好。
注2:num_tries
可以任意大;它只是一种避免无限循环的方法。如果我们需要跳过第一个 n 个空结果集,那么无论如何我们都需要调用 nextset
多次,一旦我们找到 non-empty 结果集我们 break
脱离了循环。
经过 soem 研究并询问 github
答案显而易见
您需要通过
传递所需的参数
connect_args=
以及自sqlalchemy
以来的参数
是
{"client_flag": MULTI_STATEMENTS}
所以你的python代码和他的
一样
from sqlalchemy import create_engine
import pymysql
from pymysql.constants.CLIENT import MULTI_STATEMENTS
user = 'root'
mysql_pass = 'testpassword'
mysql_ip = 'localhost'
cmd = 'SELECT * FROM table1;SELECT * FROM test'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306/testdb1?charset=utf8".format(user,mysql_pass,mysql_ip),connect_args={"client_flag": MULTI_STATEMENTS})
connection = engine.raw_connection()
try:
cursor = connection.cursor()
cursor.execute(cmd)
results_one = cursor.fetchall()
cursor.nextset()
results_two = cursor.fetchall()
cursor.close()
finally:
connection.close()
但是使用此解决方案,您需要事先知道哪些查询您 运行。
如果您想更灵活,使用动态sql语句
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'testpassword'
mysql_ip = 'localhost'
cmd = 'SELECT * FROM table1;SELECT * FROM test'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306/testdb1?charset=utf8".format(user,mysql_pass,mysql_ip))
connection = engine.raw_connection()
splitstring = cmd.split(";")
ges_resultset = []
try:
cursor = connection.cursor()
for cmdoneonly in splitstring:
cursor.execute(cmdoneonly)
results = cursor.fetchall()
ges_resultset.append(results)
cursor.close()
finally:
connection.close()
您可以在其中检查每个命令并了解 python 如何对其做出反应
- SELECT需要得到结果集
- INSERT DELETE CREATE 你不会(还有更多,但你明白了要点)
@Gord Thompson,我做了一点改进来自动设置 num_tries:
import pandas as pd
import pymysql
from pymysql.constants import CLIENT
conn_info = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "your mysql passwd",
"client_flag": CLIENT.MULTI_STATEMENTS,
}
cnxn = pymysql.connect(**conn_info)
crsr = cnxn.cursor()
sql = """\
create database sample;
USE sample;
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
SELECT txt FROM tmp;
"""
crsr.execute(sql)
num_tries = sql.count(';') if sql.endswith(';') else sql.count(';') + 1
for i in range(num_tries):
result = crsr.fetchall()
if result:
df = pd.DataFrame(result, columns=[x[0] for x in crsr.description])
print(df)
crsr.nextset()
@nbk:当cmd
包含很多sql语句时,执行你的代码可能会遇到这样的问题:
pymysql.err.InternalError: (1065, 'Query was empty')
根据你的代码做一点改进:
import pandas as pd
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = 'localhost'
sql = """\
create database sample;
USE sample;
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
SELECT txt FROM tmp;
"""
engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))
connection = engine.raw_connection()
splitstring = sql.split(";")
try:
cursor = connection.cursor()
for cmdoneonly in splitstring:
if cmdoneonly.strip():
cursor.execute(cmdoneonly)
results = cursor.fetchall()
if results :
df = pd.DataFrame(results, columns=[x[0] for x in cursor.description])
print(df)
cursor.close()
finally:
connection.close()
需要添加判断语句if cmdoneonly.strip():
以避免1065:Query was empty
错误。
df = pd.DataFrame(results, columns=[x[0] for x in cursor.description])
向@Gord Thompson
学习
的绝妙说法。
让我创建一个用例来讨论。
CREATE DATABASE sample;
USE sample;
CREATE TABLE quote (
`id` int(2) unsigned NOT NULL AUTO_INCREMENT,
`code` text ,
`date` date DEFAULT NULL,
`close` double DEFAULT NULL,
PRIMARY KEY (`id`)
) ;
INSERT INTO quote (`code`, `date`, `close`)
VALUES ('epm', '20200824', 2.64);
INSERT INTO quote (`code`, `date`, `close`)
VALUES ('dss', '20200824', 6.4);
用 sqlalchemy 只执行一个 sql 命令很简单。
import pandas as pd
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = '127.0.0.1'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))
cmd_one_line_sql = 'select * from sample.quote;'
df = pd.read_sql_query(cmd_one_line_sql,con = engine)
df
id code date close
0 1 epm 2020-08-24 2.64
1 2 dss 2020-08-24 6.40
我得到了想要的结果,现在cmd包含多个sql命令,为简单起见,它只包含两行
cmd_multi_lines_sql = 'use sample;select * from quote;'
cmd_multi_lines_sql
刚刚将 cmd_one_line_sql
拆分为两个。
我根据手册重写了代码片段:
execute many sql commands with sqlalchemy
import pandas as pd
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = '127.0.0.1'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))
connection = engine.raw_connection()
cmd_multi_lines_sql = 'use sample;select * from quote;'
try:
cursor = connection.cursor()
cursor.execute(cmd_multi_lines_sql)
results_one = cursor.fetchall()
finally:
connection.close()
获取以下错误信息:
Traceback (most recent call last):
File "<stdin>", line 3, in <module>
File "/usr/local/lib/python3.5/dist-packages/pymysql/cursors.py", line 170, in execute
result = self._query(query)
File "/usr/local/lib/python3.5/dist-packages/pymysql/cursors.py", line 328, in _query
conn.query(q)
File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 517, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 732, in _read_query_result
result.read()
File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 1075, in read
first_packet = self.connection._read_packet()
File "/usr/local/lib/python3.5/dist-packages/pymysql/connections.py", line 684, in _read_packet
packet.check_error()
File "/usr/local/lib/python3.5/dist-packages/pymysql/protocol.py", line 220, in check_error
err.raise_mysql_exception(self._data)
File "/usr/local/lib/python3.5/dist-packages/pymysql/err.py", line 109, in raise_mysql_exception
raise errorclass(errno, errval)
pymysql.err.ProgrammingError: (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MariaDB server version for the right syntax to use near 'select * from quote' at line 1")
再试一次:
vim /tmp/test.sql
use sample;
select * from quote;
#write the commands in `/tmp/test.sql`
f = open('/tmp/test.sql','r')
cmd = f.read()
df = pd.read_sql_query(cmd, con = engine)
它输出相同的错误info.How要修复它?
您面临的问题是:
- 您需要将
MULTI_STATEMENTS
标志传递给 PyMySQL,并且 read_sql_query
假定第一个结果集包含 DataFrame 的数据,对于匿名代码块而言可能并非如此。
您可以创建自己的 PyMySQL 连接并像这样检索数据:
import pandas as pd
import pymysql
from pymysql.constants import CLIENT
conn_info = {
"host": "localhost",
"port": 3307,
"user": "root",
"password": "toot",
"database": "mydb",
"client_flag": CLIENT.MULTI_STATEMENTS,
}
cnxn = pymysql.connect(**conn_info)
crsr = cnxn.cursor()
sql = """\
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
"""
crsr.execute(sql)
num_tries = 5
result = None
for i in range(num_tries):
result = crsr.fetchall()
if result:
break
crsr.nextset()
if not result:
print(f"(no result found after {num_tries} attempts)")
else:
df = pd.DataFrame(result, columns=[x[0] for x in crsr.description])
print(df)
"""console output:
id txt
0 1 foo
1 2 ΟΠΑ!
"""
(编辑)补充说明:
注意1:如create_engine
方法的connect_args
参数来传递MULTI_STATEMENTS
标志。如果你需要一个 SQLAlchemy Engine
对象来做其他事情(例如,to_sql
)那么这可能比直接创建你自己的 PyMySQL 连接更好。
注2:num_tries
可以任意大;它只是一种避免无限循环的方法。如果我们需要跳过第一个 n 个空结果集,那么无论如何我们都需要调用 nextset
多次,一旦我们找到 non-empty 结果集我们 break
脱离了循环。
经过 soem 研究并询问 github 答案显而易见
您需要通过
传递所需的参数connect_args=
以及自sqlalchemy
以来的参数是
{"client_flag": MULTI_STATEMENTS}
所以你的python代码和他的
一样from sqlalchemy import create_engine
import pymysql
from pymysql.constants.CLIENT import MULTI_STATEMENTS
user = 'root'
mysql_pass = 'testpassword'
mysql_ip = 'localhost'
cmd = 'SELECT * FROM table1;SELECT * FROM test'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306/testdb1?charset=utf8".format(user,mysql_pass,mysql_ip),connect_args={"client_flag": MULTI_STATEMENTS})
connection = engine.raw_connection()
try:
cursor = connection.cursor()
cursor.execute(cmd)
results_one = cursor.fetchall()
cursor.nextset()
results_two = cursor.fetchall()
cursor.close()
finally:
connection.close()
但是使用此解决方案,您需要事先知道哪些查询您 运行。
如果您想更灵活,使用动态sql语句
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'testpassword'
mysql_ip = 'localhost'
cmd = 'SELECT * FROM table1;SELECT * FROM test'
engine = create_engine("mysql+pymysql://{}:{}@{}:3306/testdb1?charset=utf8".format(user,mysql_pass,mysql_ip))
connection = engine.raw_connection()
splitstring = cmd.split(";")
ges_resultset = []
try:
cursor = connection.cursor()
for cmdoneonly in splitstring:
cursor.execute(cmdoneonly)
results = cursor.fetchall()
ges_resultset.append(results)
cursor.close()
finally:
connection.close()
您可以在其中检查每个命令并了解 python 如何对其做出反应
- SELECT需要得到结果集
- INSERT DELETE CREATE 你不会(还有更多,但你明白了要点)
@Gord Thompson,我做了一点改进来自动设置 num_tries:
import pandas as pd
import pymysql
from pymysql.constants import CLIENT
conn_info = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "your mysql passwd",
"client_flag": CLIENT.MULTI_STATEMENTS,
}
cnxn = pymysql.connect(**conn_info)
crsr = cnxn.cursor()
sql = """\
create database sample;
USE sample;
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
SELECT txt FROM tmp;
"""
crsr.execute(sql)
num_tries = sql.count(';') if sql.endswith(';') else sql.count(';') + 1
for i in range(num_tries):
result = crsr.fetchall()
if result:
df = pd.DataFrame(result, columns=[x[0] for x in crsr.description])
print(df)
crsr.nextset()
@nbk:当cmd
包含很多sql语句时,执行你的代码可能会遇到这样的问题:
pymysql.err.InternalError: (1065, 'Query was empty')
根据你的代码做一点改进:
import pandas as pd
from sqlalchemy import create_engine
user = 'root'
mysql_pass = 'your mysql passwd'
mysql_ip = 'localhost'
sql = """\
create database sample;
USE sample;
CREATE TEMPORARY TABLE tmp (id int primary key, txt varchar(20))
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
INSERT INTO tmp (id, txt) VALUES (1, 'foo'), (2, 'ΟΠΑ!');
SELECT id, txt FROM tmp;
SELECT txt FROM tmp;
"""
engine = create_engine("mysql+pymysql://{}:{}@{}:3306".format(user,mysql_pass,mysql_ip))
connection = engine.raw_connection()
splitstring = sql.split(";")
try:
cursor = connection.cursor()
for cmdoneonly in splitstring:
if cmdoneonly.strip():
cursor.execute(cmdoneonly)
results = cursor.fetchall()
if results :
df = pd.DataFrame(results, columns=[x[0] for x in cursor.description])
print(df)
cursor.close()
finally:
connection.close()
需要添加判断语句
if cmdoneonly.strip():
以避免1065:Query was empty
错误。
的绝妙说法。df = pd.DataFrame(results, columns=[x[0] for x in cursor.description])
向@Gord Thompson
学习