如何枚举 Spark DataFrame 中的列?如果列是嵌套的怎么办?

How to enumerate columns in Spark's DataFrame? What if columns are nested?

显然它没有 items() 方法。那怎么办?

我正在尝试使用以下代码将行发送到数据库:

def write_row(table_name, cur, row):
    data = []
    for key, value in row.items():
        data.append((key, value))
    data = zip(*data)

    columns = ", ".join(data[0])
    values = data[1]
    questionmarks = ", ".join(["?"] * len(columns))

    query = f"INSERT INTO {table_name} ({columns}) VALUES ({questionmarks})"
    cur.execute(query, values)


def write_data_frame(df, epoch1):
    conn = mariadb.connect(**config["mariadb"])
    cur = conn.cursor()

    table_name = "pysparktest"

    rows = df.collect()
    for row in rows:
        write_row(table_name, cur, row)

    conn.commit()

发誓

AttributeError: items

如果行是嵌套的怎么办?

root
 |-- track: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- car: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- cnt: long (nullable = false)
 |-- minBestLapTime: double (nullable = true)

就像编译器发誓的那样 - class 行中没有名为“items()”的方法。

你需要做的是使用“asDict”方法。它将 Row 中的键、值输出为 python dict.

在嵌套列的情况下,asDict 函数中有一个名为 recursive 的参数,将其设置为 True。默认情况下,它设置为 False。

例如:

row = Row(name="Alice", age=11)
row_as_dict = row.asDict()
row_as_dict

输出:

{'name': 'Alice', 'age': 11}

对于迭代:

for key in row_as_dict:
    print("{} : {}".format(key, row_as_dict[key]))

输出:

name : Alice
age : 11

在嵌套列的情况下

row = Row(key=1, value=Row(name='a', age=2))
row_as_dict = row.asDict(recursive=True)
row_as_dict

输出:

{'key': 1, 'value': {'name': 'a', 'age': 2}}