python SQL解析器获取列名和数据类型

python SQL parser to get the column name and data type

我正在使用 python SQL 解析器来获取 table 信息。我能够获得 table 名称和架构名称。

import sqlparse
line = '''
CREATE TABLE public.actor (
    actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
    first_name character varying(45) NOT NULL,
    last_name character varying(45) NOT NULL,
    last_update timestamp without time zone DEFAULT now() NOT NULL
);

CREATE TABLE public.category (
    category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
    name character varying(25) NOT NULL,
    last_update timestamp without time zone DEFAULT now() NOT NULL
);

CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
    "div_cd" VARCHAR(2) NOT NULL
    ,"div_name" VARCHAR(30) NOT NULL
    ,"org_cd" VARCHAR(8) NOT NULL
    ,"org_name" VARCHAR(60) NOT NULL
    ,"team_cd" VARCHAR(2) NOT NULL
    ,"team_name" VARCHAR(120) NOT NULL
    ,"personal_cd" VARCHAR(7) NOT NULL
    ,"personal_name" VARCHAR(300) NOT NULL
    ,"username" VARCHAR(6) NOT NULL
    ,"staff_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
    ,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;

CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
     "staff_flg" CHAR(1)  DEFAULT '0'::bpchar SORTKEY ENCODE lzo 
    ,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''
parse = sqlparse.parse(line)
print([str(t) for t in parse[0].tokens if t.ttype is None][0])

Output: 
public.actor

但是如果我想 return 列名和数据类型,我可以使用哪个标记来打印这两个 DDL。

输出是这样的,[不完全一样:)]

table: public.actor

逐一打印列名和数据类型(可能在for循环中)

column: actor_id
date type: integer
column: first_name
data type: character varying

您可以使用 python-sqlparse 示例中可用的 script 中的函数来提取数据:

def extract_definitions(token_list):
    # assumes that token_list is a parenthesis
    definitions = []
    tmp = []
    par_level = 0
    for token in token_list.flatten():
        if token.is_whitespace:
            continue
        elif token.match(sqlparse.tokens.Punctuation, '('):
            par_level += 1
            continue
        if token.match(sqlparse.tokens.Punctuation, ')'):
            if par_level == 0:
                break
            else:
                par_level += 1
        elif token.match(sqlparse.tokens.Punctuation, ','):
            if tmp:
                definitions.append(tmp)
            tmp = []
        else:
            tmp.append(token)
    if tmp:
        definitions.append(tmp)
    return definitions

并使用 for-loop 打印有关列名称及其数据类型的信息:

parsed = sqlparse.parse(line)[0]

# extract the parenthesis which holds column definitions
_, par = parsed.token_next_by(i=sqlparse.sql.Parenthesis)
columns = extract_definitions(par)

for column in columns:
    print(f"column: {column[0]}")
    print(f"data type: {' '.join(str(t) for t in column[1:])}")

此代码产生以下输出:

column: actor_id
data type: integer DEFAULT nextval 'public.actor_actor_id_seq' :: regclass NOT NULL
column: first_name
data type: character varying 45 NOT NULL
column: last_name
data type: character varying 45 NOT NULL
column: last_update
data type: timestamp without time zone DEFAULT now NOT NULL

实际上比您想要的信息多一点。但是,这些字符串应该很容易 ''regexable'' 以仅提取基本数据类型。

如评论中所述,使用特殊信息(例如 DISTSTYLE)会导致解析器无法识别 sqlparse.sql.Parenthesis 实例,并被报告为 bug。因此,在解析之前需要从 SQL 查询中删除此类信息。

内联记录的代码

import sqlparse

line = '''
CREATE TABLE public.actor (
    actor_id integer DEFAULT nextval('public.actor_actor_id_seq'::regclass) NOT NULL,
    first_name character varying(45) NOT NULL,
    last_name character varying(45) NOT NULL,
    last_update timestamp without time zone DEFAULT now() NOT NULL
);

CREATE TABLE public.category (
    category_id integer DEFAULT nextval('public.category_category_id_seq'::regclass) NOT NULL,
    name character varying(25) NOT NULL,
    last_update timestamp without time zone DEFAULT now() NOT NULL
);

CREATE TABLE IF NOT EXISTS "sample_schema"."sample_table"
(
    "div_cd" VARCHAR(2) NOT NULL
    ,"div_name" VARCHAR(30) NOT NULL
    ,"org_cd" VARCHAR(8) NOT NULL
    ,"org_name" VARCHAR(60) NOT NULL
    ,"team_cd" VARCHAR(2) NOT NULL
    ,"team_name" VARCHAR(120) NOT NULL
    ,"personal_cd" VARCHAR(7) NOT NULL
    ,"personal_name" VARCHAR(300) NOT NULL
    ,"username" VARCHAR(6) NOT NULL
    ,"staff_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
    ,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;

CREATE TABLE IF NOT EXISTS "sample_schema"."ref_table"
(
     "staff_flg" CHAR(1)  DEFAULT '0'::bpchar SORTKEY ENCODE lzo 
    ,"leader_flg" CHAR(1)  DEFAULT '0'::bpchar ENCODE lzo
)
DISTSTYLE EVEN
;
'''

def get_table_name(tokens):
    for token in reversed(tokens):
        if token.ttype is None:
            return token.value
    return " "

parse = sqlparse.parse(line)
for stmt in parse:
    # Get all the tokens except whitespaces
    tokens = [t for t in sqlparse.sql.TokenList(stmt.tokens) if t.ttype != sqlparse.tokens.Whitespace]
    is_create_stmt = False
    for i, token in enumerate(tokens):
        # Is it a create statements ?
        if token.match(sqlparse.tokens.DDL, 'CREATE'):
            is_create_stmt = True
            continue
        
        # If it was a create statement and the current token starts with "("
        if is_create_stmt and token.value.startswith("("):
            # Get the table name by looking at the tokens in reverse order till you find
            # a token with None type
            print (f"table: {get_table_name(tokens[:i])}")

            # Now parse the columns
            txt = token.value
            columns = txt[1:txt.rfind(")")].replace("\n","").split(",")
            for column in columns:
                c = ' '.join(column.split()).split()
                c_name = c[0].replace('\"',"")
                c_type = c[1]  # For condensed type information 
                # OR 
                #c_type = " ".join(c[1:]) # For detailed type information 
                print (f"column: {c_name}")
                print (f"date type: {c_type}")
            print ("---"*20)
            break

输出:

table: public.actor
column: actor_id
date type: integer
column: first_name
date type: character
column: last_name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: public.category
column: category_id
date type: integer
column: name
date type: character
column: last_update
date type: timestamp
------------------------------------------------------------
table: "sample_schema"."sample_table"
column: div_cd
date type: VARCHAR(2)
column: div_name
date type: VARCHAR(30)
column: org_cd
date type: VARCHAR(8)
column: org_name
date type: VARCHAR(60)
column: team_cd
date type: VARCHAR(2)
column: team_name
date type: VARCHAR(120)
column: personal_cd
date type: VARCHAR(7)
column: personal_name
date type: VARCHAR(300)
column: username
date type: VARCHAR(6)
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------
table: "sample_schema"."ref_table"
column: staff_flg
date type: CHAR(1)
column: leader_flg
date type: CHAR(1)
------------------------------------------------------------