如何在 python 中读取 csv 文件和 return 外部 table 查询?

How to read csv file and return external table query in python?

我正在尝试读取 csv 文件并通过数据框创建外部 table 查询。请帮助我如何实现我的目标?

示例:

假设我有这样的 df-

df = pd.DataFrame({'A': [1,2,3], 'B': [True, False, False], 'C': ['a', 'b', 'c']})
print(df.dtypes)

A     int64
B      bool
C    object
dtype: object

我必须根据 dataframe 给出的信息创建外部 table-

CREATE EXTERNAL TABLE schema_name.table_name
(
A INT,
B VARCHAR(100),
C VARCHAR(100)
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES 
(
'separatorChar' = ','
)
LOCATION 'location'
TABLE PROPERTIES ('skip.header.line.count'='1') ;

转换应该是这样的-

int64  - INT,
float64 - FLOAT,
object - VARCHAR(100),
bool - VARCHAR(10),
date - TIMESTAMP

请帮助我如何创建外部 table?

不确定您的目标是什么。您是否尝试从现有 pandas 数据框生成 sql 查询?看起来像是一项直接的任务 - 遍历数据框列,生成查询的相应部分并使用字符串格式构建结果。

考虑以下可能的实施方式:

import pandas as pd

# pandas types to sql types mapping
TYPE_MAP = {
    'object': 'VARCHAR({})',
    'int64': 'INT',
    'float64': 'FLOAT',
    'bool': 'VARCHAR({})',
    'datetime64': 'TIMESTAMP',
    'timedelta[ns]': 'NotImplemented',
    'category': 'NotImplemented'}

def create_query(data, data_config):
    query_template = """CREATE EXTERNAL TABLE schema_name.table_name
(
{}
) parameters
parameters"""

    query_columns = list()
    for col in data:
        col_type = TYPE_MAP[str(data[col].dtype)]
        if col_type.startswith('VARCHAR'):
            # compute max_length from data
            # col_type = col_type.format(data[col].astype('str').str.len().max())
            # or use pre-defined values
            col_type = col_type.format(data_config['varchar_length'][col])
        query_columns.append(f"{col} {col_type}")

    return query_template.format(',\n'.join(query_columns))


if __name__ == '__main__':

    df = pd.DataFrame({'A': [1, 2, 3], 'B': [True, False, False], 'C': ['a', 'b', 'c']})
    cfg = {'varchar_length': {'B': 90, 'C': 110}}
    query = create_query(df, cfg)
    print(query)

结果是:

CREATE EXTERNAL TABLE schema_name.table_name
(
A INT,
B VARCHAR(90),
C VARCHAR(110)
) parameters
parameters

SQL query 是普通字符串,因此您可以将其格式化为任何其他字符串。

您可以将 for 循环与 df.dtypes.items() 一起使用以获得 namedtype 并转换为预期行 A INTB VARCHAR(100)C VARCHAR(100).

并且可以使用字典进行转换

convert = {
    "int64": "INT",
    "float64": "FLOAT",
    "object": "VARCHAR(100)",
    "bool": "VARCHAR(10)",
    "date": "TIMESTAMP",    
}    

稍后您可以使用 join",\n" 在除一行之外的所有行中添加逗号。

最后,您可以使用 f-string.format()

将其放入字符串 CREATE ...
import pandas as pd

df = pd.DataFrame({'A': [1,2,3], 'B': [True, False, False], 'C': ['a', 'b', 'c']})
#print(df.dtypes)

convert = {
    "int64": "INT",
    "float64": "FLOAT",
    "object": "VARCHAR(100)",
    "bool": "VARCHAR(10)",
    "date": "TIMESTAMP",    
}    
    
all_lines = []    
for name, dtype in df.dtypes.items():
    dtype = str(dtype)
    line  = f'{name} {convert[dtype]}'
    #print(dtype, "=>", line)
    all_lines.append( line )
    
text = ",\n".join(all_lines)

print(f"""CREATE EXTERNAL TABLE schema_name.table_name
(
{text}
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES 
(
'separatorChar' = ','
)
LOCATION 'location'
TABLE PROPERTIES ('skip.header.line.count'='1') ;""")

结果:

CREATE EXTERNAL TABLE schema_name.table_name
(
A INT,
B VARCHAR(10),
C VARCHAR(100)
) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES 
(
'separatorChar' = ','
)
LOCATION 'location'
TABLE PROPERTIES ('skip.header.line.count'='1') ;