python SQL解析器获取列名和数据类型
python SQL parser to get the column name and data type
我正在使用 python SQL 解析器来获取 table 信息。我能够获得 table 名称和架构名称。
import sqlparse
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
"staff_flg" CHAR(1) DEFAULT '0'::bpchar SORTKEY ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
parse = sqlparse.parse(line)
print([str(t) for t in parse[0].tokens if t.ttype is None][0])
Output:
public.actor
但是如果我想 return 列名和数据类型,我可以使用哪个标记来打印这两个 DDL。
输出是这样的,[不完全一样:)]
table: public.actor
逐一打印列名和数据类型(可能在for循环中)
column: actor_id
date type: integer
column: first_name
data type: character varying
您可以使用 python-sqlparse 示例中可用的 script 中的函数来提取数据:
def extract_definitions(token_list):
# assumes that token_list is a parenthesis
definitions = []
tmp = []
par_level = 0
for token in token_list.flatten():
if token.is_whitespace:
continue
elif token.match(sqlparse.tokens.Punctuation, '('):
par_level += 1
continue
if token.match(sqlparse.tokens.Punctuation, ')'):
if par_level == 0:
break
else:
par_level += 1
elif token.match(sqlparse.tokens.Punctuation, ','):
if tmp:
definitions.append(tmp)
tmp = []
else:
tmp.append(token)
if tmp:
definitions.append(tmp)
return definitions
并使用 for-loop 打印有关列名称及其数据类型的信息:
parsed = sqlparse.parse(line)[0]
# extract the parenthesis which holds column definitions
_, par = parsed.token_next_by(i=sqlparse.sql.Parenthesis)
columns = extract_definitions(par)
for column in columns:
print(f"column: {column[0]}")
print(f"data type: {' '.join(str(t) for t in column[1:])}")
此代码产生以下输出:
column: actor_id
data type: integer DEFAULT nextval 'public.actor_actor_id_seq' :: regclass NOT NULL
column: first_name
data type: character varying 45 NOT NULL
column: last_name
data type: character varying 45 NOT NULL
column: last_update
data type: timestamp without time zone DEFAULT now NOT NULL
实际上比您想要的信息多一点。但是,这些字符串应该很容易 ''regexable'' 以仅提取基本数据类型。
如评论中所述,使用特殊信息(例如 DISTSTYLE
)会导致解析器无法识别 sqlparse.sql.Parenthesis
实例,并被报告为 bug。因此,在解析之前需要从 SQL 查询中删除此类信息。
内联记录的代码
import sqlparse
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
"staff_flg" CHAR(1) DEFAULT '0'::bpchar SORTKEY ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
def get_table_name(tokens):
for token in reversed(tokens):
if token.ttype is None:
return token.value
return " "
parse = sqlparse.parse(line)
for stmt in parse:
# Get all the tokens except whitespaces
tokens = [t for t in sqlparse.sql.TokenList(stmt.tokens) if t.ttype != sqlparse.tokens.Whitespace]
is_create_stmt = False
for i, token in enumerate(tokens):
# Is it a create statements ?
if token.match(sqlparse.tokens.DDL, 'CREATE'):
is_create_stmt = True
continue
# If it was a create statement and the current token starts with "("
if is_create_stmt and token.value.startswith("("):
# Get the table name by looking at the tokens in reverse order till you find
# a token with None type
print (f"table: {get_table_name(tokens[:i])}")
# Now parse the columns
txt = token.value
columns = txt[1:txt.rfind(")")].replace("\n","").split(",")
for column in columns:
c = ' '.join(column.split()).split()
c_name = c[0].replace('\"',"")
c_type = c[1] # For condensed type information
# OR
#c_type = " ".join(c[1:]) # For detailed type information
print (f"column: {c_name}")
print (f"date type: {c_type}")
print ("---"*20)
break
输出:
table: public.actor
column: actor_id
date type: integer
column: first_name
date type: character
column: last_name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: public.category
column: category_id
date type: integer
column: name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: "sample_schema"."sample_table"
column: div_cd
date type: VARCHAR(2)
column: div_name
date type: VARCHAR(30)
column: org_cd
date type: VARCHAR(8)
column: org_name
date type: VARCHAR(60)
column: team_cd
date type: VARCHAR(2)
column: team_name
date type: VARCHAR(120)
column: personal_cd
date type: VARCHAR(7)
column: personal_name
date type: VARCHAR(300)
column: username
date type: VARCHAR(6)
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------
table: "sample_schema"."ref_table"
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------
我正在使用 python SQL 解析器来获取 table 信息。我能够获得 table 名称和架构名称。
import sqlparse
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
"staff_flg" CHAR(1) DEFAULT '0'::bpchar SORTKEY ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
parse = sqlparse.parse(line)
print([str(t) for t in parse[0].tokens if t.ttype is None][0])
Output:
public.actor
但是如果我想 return 列名和数据类型,我可以使用哪个标记来打印这两个 DDL。
输出是这样的,[不完全一样:)]
table: public.actor
逐一打印列名和数据类型(可能在for循环中)
column: actor_id
date type: integer
column: first_name
data type: character varying
您可以使用 python-sqlparse 示例中可用的 script 中的函数来提取数据:
def extract_definitions(token_list):
# assumes that token_list is a parenthesis
definitions = []
tmp = []
par_level = 0
for token in token_list.flatten():
if token.is_whitespace:
continue
elif token.match(sqlparse.tokens.Punctuation, '('):
par_level += 1
continue
if token.match(sqlparse.tokens.Punctuation, ')'):
if par_level == 0:
break
else:
par_level += 1
elif token.match(sqlparse.tokens.Punctuation, ','):
if tmp:
definitions.append(tmp)
tmp = []
else:
tmp.append(token)
if tmp:
definitions.append(tmp)
return definitions
并使用 for-loop 打印有关列名称及其数据类型的信息:
parsed = sqlparse.parse(line)[0]
# extract the parenthesis which holds column definitions
_, par = parsed.token_next_by(i=sqlparse.sql.Parenthesis)
columns = extract_definitions(par)
for column in columns:
print(f"column: {column[0]}")
print(f"data type: {' '.join(str(t) for t in column[1:])}")
此代码产生以下输出:
column: actor_id
data type: integer DEFAULT nextval 'public.actor_actor_id_seq' :: regclass NOT NULL
column: first_name
data type: character varying 45 NOT NULL
column: last_name
data type: character varying 45 NOT NULL
column: last_update
data type: timestamp without time zone DEFAULT now NOT NULL
实际上比您想要的信息多一点。但是,这些字符串应该很容易 ''regexable'' 以仅提取基本数据类型。
如评论中所述,使用特殊信息(例如 DISTSTYLE
)会导致解析器无法识别 sqlparse.sql.Parenthesis
实例,并被报告为 bug。因此,在解析之前需要从 SQL 查询中删除此类信息。
内联记录的代码
import sqlparse
line = '''
CREATE TABLE public.actor (
actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
first_name character varying(45) NOT NULL,
last_name character varying(45) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE public.category (
category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
name character varying(25) NOT NULL,
last_update timestamp without time zone DEFAULT now() NOT NULL
);
CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
"div_cd" VARCHAR(2) NOT NULL
,"div_name" VARCHAR(30) NOT NULL
,"org_cd" VARCHAR(8) NOT NULL
,"org_name" VARCHAR(60) NOT NULL
,"team_cd" VARCHAR(2) NOT NULL
,"team_name" VARCHAR(120) NOT NULL
,"personal_cd" VARCHAR(7) NOT NULL
,"personal_name" VARCHAR(300) NOT NULL
,"username" VARCHAR(6) NOT NULL
,"staff_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
"staff_flg" CHAR(1) DEFAULT '0'::bpchar SORTKEY ENCODE lzo
,"leader_flg" CHAR(1) DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
def get_table_name(tokens):
for token in reversed(tokens):
if token.ttype is None:
return token.value
return " "
parse = sqlparse.parse(line)
for stmt in parse:
# Get all the tokens except whitespaces
tokens = [t for t in sqlparse.sql.TokenList(stmt.tokens) if t.ttype != sqlparse.tokens.Whitespace]
is_create_stmt = False
for i, token in enumerate(tokens):
# Is it a create statements ?
if token.match(sqlparse.tokens.DDL, 'CREATE'):
is_create_stmt = True
continue
# If it was a create statement and the current token starts with "("
if is_create_stmt and token.value.startswith("("):
# Get the table name by looking at the tokens in reverse order till you find
# a token with None type
print (f"table: {get_table_name(tokens[:i])}")
# Now parse the columns
txt = token.value
columns = txt[1:txt.rfind(")")].replace("\n","").split(",")
for column in columns:
c = ' '.join(column.split()).split()
c_name = c[0].replace('\"',"")
c_type = c[1] # For condensed type information
# OR
#c_type = " ".join(c[1:]) # For detailed type information
print (f"column: {c_name}")
print (f"date type: {c_type}")
print ("---"*20)
break
输出:
table: public.actor
column: actor_id
date type: integer
column: first_name
date type: character
column: last_name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: public.category
column: category_id
date type: integer
column: name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: "sample_schema"."sample_table"
column: div_cd
date type: VARCHAR(2)
column: div_name
date type: VARCHAR(30)
column: org_cd
date type: VARCHAR(8)
column: org_name
date type: VARCHAR(60)
column: team_cd
date type: VARCHAR(2)
column: team_name
date type: VARCHAR(120)
column: personal_cd
date type: VARCHAR(7)
column: personal_name
date type: VARCHAR(300)
column: username
date type: VARCHAR(6)
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------
table: "sample_schema"."ref_table"
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------