结果集中缺少第一行 mysql_hook
First row missing from the result set mysql_hook
下面的代码用于从 MySQL
中获取记录
from airflow.hooks.mysql_hook import MySqlHook
sql = "SELECT name, email FROM test.tbl_users"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_con', schema = 'test')
connection = mysql_hook.get_conn()
cur_dev = connection.cursor()
cur_dev.execute(sql)
source_list = cur_dev.fetchall()
print(source_list)
预期输出
结果集中缺少突出显示的行
我是不是漏了什么。
为什么不使用钩子函数?对于大多数情况,您不需要使用游标。挂钩具有为您完成工作的功能。
您可以使用:get_records
、get_first
、get_pandas_df
、run
等...
from airflow.providers.mysql.hooks.mysql import MySqlHook
sql = "SELECT name, email FROM test.tbl_users"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_con', schema = 'test')
records = mysql_hook.get_records(sql=sql)
如果确实需要光标,请使用 get_cursor()
问题已通过使用 import pymysql
解决。下面是更新后的代码。
from airflow.models.connection import Connection
import pymysql
conn = Connection.get_connection_from_secrets(conn_id='mysql_con')
conn_dev = pymysql.connect(
user=conn.login,
password=conn.password,
host=conn.host,
port=conn.port,
database=conn.schema
)
cur_dev = conn_dev.cursor()
cur_dev.execute("SELECT name, email FROM test.tbl_users")
下面的代码用于从 MySQL
中获取记录from airflow.hooks.mysql_hook import MySqlHook
sql = "SELECT name, email FROM test.tbl_users"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_con', schema = 'test')
connection = mysql_hook.get_conn()
cur_dev = connection.cursor()
cur_dev.execute(sql)
source_list = cur_dev.fetchall()
print(source_list)
预期输出
结果集中缺少突出显示的行
我是不是漏了什么。
为什么不使用钩子函数?对于大多数情况,您不需要使用游标。挂钩具有为您完成工作的功能。
您可以使用:get_records
、get_first
、get_pandas_df
、run
等...
from airflow.providers.mysql.hooks.mysql import MySqlHook
sql = "SELECT name, email FROM test.tbl_users"
mysql_hook = MySqlHook(mysql_conn_id = 'mysql_con', schema = 'test')
records = mysql_hook.get_records(sql=sql)
如果确实需要光标,请使用 get_cursor()
问题已通过使用 import pymysql
解决。下面是更新后的代码。
from airflow.models.connection import Connection
import pymysql
conn = Connection.get_connection_from_secrets(conn_id='mysql_con')
conn_dev = pymysql.connect(
user=conn.login,
password=conn.password,
host=conn.host,
port=conn.port,
database=conn.schema
)
cur_dev = conn_dev.cursor()
cur_dev.execute("SELECT name, email FROM test.tbl_users")