将嵌套 Json 字符串转换为 Spark Dataframe

Convert Nested Json String into Spark Dataframe

我目前正在尝试将 JSON 响应发送到 pyspark 数据帧中。当运行这个:

crypto_df = spark.read.option("inferSchema", "true").json(sc.parallelize([response.text]))

我将数据框分成两列:datastatus

|data|status|
|----|------|
|{"1": {"circulating_supply": 18918568, "cmc_rank": 1, "date_added": "2013-04-28T00:00:00.000Z", "id": 1, "is_active": 1, "is_fiat": 0, "last_updated": "2022-01-03T07:49:00.000Z", "max_supply": 21000000, "name": "Bitcoin", "num_market_pairs": 8970, "platform": null, "quote": {"USD": {"fully_diluted_market_cap": 987234126025.89, "last_updated": "2022-01-03T07:49:00.000Z", "market_cap": 889383616435.3029, "market_cap_dominance": 39.675, "percent_change_1h": -0.05951781, "percent_change_24h": -0.3442455, "percent_change_30d": -0.1390473, "percent_change_60d": -24.29566082, "percent_change_7d": -7.36951384, "percent_change_90d": -4.92043025, "price": 47011.14885837569, "volume_24h": 28717292214.63145, "volume_change_24h": 17.9261}}, "slug": "bitcoin", "symbol": "BTC", "tags": ["mineable", "pow", "sha-256", "store-of-value", "state-channel", "coinbase-ventures-portfolio", "three-arrows-capital-portfolio", "polychain-capital-portfolio", "binance-labs-portfolio", "blockchain-capital-portfolio", "boostvc-portfolio", "cms-holdings-portfolio", "dcg-portfolio", "dragonfly-capital-portfolio", "electric-capital-portfolio", "fabric-ventures-portfolio", "framework-ventures-portfolio", "galaxy-digital-portfolio", "huobi-capital-portfolio", "alameda-research-portfolio", "a16z-portfolio", "1confirmation-portfolio", "winklevoss-capital-portfolio", "usv-portfolio", "placeholder-ventures-portfolio", "pantera-capital-portfolio", "multicoin-capital-portfolio", "paradigm-portfolio"], "total_supply": 18918568}, "52": {"circulating_supply": 47535964473, "cmc_rank": 8, "date_added": "2013-08-04T00:00:00.000Z", "id": 52, "is_active": 1, "is_fiat": 0, "last_updated": "2022-01-03T07:49:00.000Z", "max_supply": 100000000000, "name": "XRP", "num_market_pairs": 672, "platform": null, "quote": {"USD": {"fully_diluted_market_cap": 84323321434.55, "last_updated": "2022-01-03T07:49:00.000Z", "market_cap": 40083904119.58345, "market_cap_dominance": 1.7881, "percent_change_1h": -0.31709136, "percent_change_24h": 0.14142393, "percent_change_30d": 14.35083717, "percent_change_60d": -29.91965929, "percent_change_7d": -8.56249711, "percent_change_90d": -19.95921333, "price": 0.8432332143455455, "volume_24h": 1198632904.6630714, "volume_change_24h": 17.9728}}, "slug": "xrp", "symbol": "XRP", "tags": ["medium-of-exchange", "enterprise-solutions", "binance-chain", "arrington-xrp-capital-portfolio", "galaxy-digital-portfolio", "a16z-portfolio", "pantera-capital-portfolio"], "total_supply": 99989907034}|{"credit_count": 1, "elapsed": 28, "error_code": 0, "error_message": null, "notice": null, "timestamp": "2022-01-03T07:50:21.385Z"}|

我只需要 data 列中的信息拆分如下:

id circulating_supply cmc_rank date_added etc...
1 18918568 1 2013-04-28T00:00:00.000Z .....

我试过 explode 但它只适用于数组,我想我需要类似的东西 JSON。

我的架构:

root
 |-- data: struct (nullable = true)
 |    |-- 1: struct (nullable = true)
 |    |    |-- circulating_supply: long (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: long (nullable = true)
 |    |-- 1027: struct (nullable = true)
 |    |    |-- circulating_supply: double (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: double (nullable = true)
 |    |-- 2010: struct (nullable = true)
 |    |    |-- circulating_supply: double (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: double (nullable = true)
 |    |-- 52: struct (nullable = true)
 |    |    |-- circulating_supply: long (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: long (nullable = true)
 |    |-- 5426: struct (nullable = true)
 |    |    |-- circulating_supply: double (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: double (nullable = true)
 |    |-- 825: struct (nullable = true)
 |    |    |-- circulating_supply: double (nullable = true)
 |    |    |-- cmc_rank: long (nullable = true)
 |    |    |-- date_added: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- is_active: long (nullable = true)
 |    |    |-- is_fiat: long (nullable = true)
 |    |    |-- last_updated: string (nullable = true)
 |    |    |-- max_supply: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- num_market_pairs: long (nullable = true)
 |    |    |-- platform: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- slug: string (nullable = true)
 |    |    |    |-- symbol: string (nullable = true)
 |    |    |    |-- token_address: string (nullable = true)
 |    |    |-- quote: struct (nullable = true)
 |    |    |    |-- USD: struct (nullable = true)
 |    |    |    |    |-- fully_diluted_market_cap: double (nullable = true)
 |    |    |    |    |-- last_updated: string (nullable = true)
 |    |    |    |    |-- market_cap: double (nullable = true)
 |    |    |    |    |-- market_cap_dominance: double (nullable = true)
 |    |    |    |    |-- percent_change_1h: double (nullable = true)
 |    |    |    |    |-- percent_change_24h: double (nullable = true)
 |    |    |    |    |-- percent_change_30d: double (nullable = true)
 |    |    |    |    |-- percent_change_60d: double (nullable = true)
 |    |    |    |    |-- percent_change_7d: double (nullable = true)
 |    |    |    |    |-- percent_change_90d: double (nullable = true)
 |    |    |    |    |-- price: double (nullable = true)
 |    |    |    |    |-- volume_24h: double (nullable = true)
 |    |    |    |    |-- volume_change_24h: double (nullable = true)
 |    |    |-- slug: string (nullable = true)
 |    |    |-- symbol: string (nullable = true)
 |    |    |-- tags: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- total_supply: double (nullable = true)
 |-- status: struct (nullable = true)
 |    |-- credit_count: long (nullable = true)
 |    |-- elapsed: long (nullable = true)
 |    |-- error_code: long (nullable = true)
 |    |-- error_message: string (nullable = true)
 |    |-- notice: string (nullable = true)
 |    |-- timestamp: string (nullable = true)

由于 Id 的值实际上是您的列 data 的结构字段名称,您可以首先从从模式中获得的论文 Ids 创建一个结构数组crypto_df.select("data.*"),然后展开结果数组并展开内部结构,或者简单地使用 inline 函数,它同时执行这两个操作:

import pyspark.sql.functions as F

crypto_df = crypto_df.withColumn(
    "data",
    F.array(*[F.col(f"data.{i}") for i in crypto_df.select("data.*").columns])
).selectExpr(
    "inline(data)"
)

crypto_df.show()

#+------------------+--------+--------------------+---+---------+-------+--------------------+------------+-------+----------------+--------+--------------------+-------+------+--------------------+------------+
#|circulating_supply|cmc_rank|          date_added| id|is_active|is_fiat|        last_updated|  max_supply|   name|num_market_pairs|platform|               quote|   slug|symbol|                tags|total_supply|
#+------------------+--------+--------------------+---+---------+-------+--------------------+------------+-------+----------------+--------+--------------------+-------+------+--------------------+------------+
#|          18918568|       1|2013-04-28T00:00:...|  1|        1|      0|2022-01-03T07:49:...|    21000000|Bitcoin|            8970|    null|{{9.8723412602589...|bitcoin|   BTC|[mineable, pow, s...|    18918568|
#|       47535964473|       8|2013-08-04T00:00:...| 52|        1|      0|2022-01-03T07:49:...|100000000000|    XRP|             672|    null|{{8.432332143455E...|    xrp|   XRP|[medium-of-exchan...| 99989907034|
#+------------------+--------+--------------------+---+---------+-------+--------------------+------------+-------+----------------+--------+--------------------+-------+------+--------------------+------------+
编辑:

根据您的评论,如果字段 platform 可以是字符串或结构,您可以使用 withField(自 Spark 3.1 起可用)将其转换为字符串,如下所示:

crypto_df.withColumn(
    "data",
    F.array(*[
        F.col(f"data.{c}").withField("platform", F.col(f"data.{c}.platform").cast("string"))
        for c in crypto_df.select("data.*").columns
    ])
).selectExpr("inline(data)").show()

#+--------------------+--------+--------------------+----+---------+-------+--------------------+------------+--------+----------------+--------------------+--------------------+--------+------+--------------------+--------------------+
#|  circulating_supply|cmc_rank|          date_added|  id|is_active|is_fiat|        last_updated|  max_supply|    name|num_market_pairs|            platform|               quote|    slug|symbol|                tags|        total_supply|
#+--------------------+--------+--------------------+----+---------+-------+--------------------+------------+--------+----------------+--------------------+--------------------+--------+------+--------------------+--------------------+
#|         1.8918687E7|       1|2013-04-28T00:00:...|   1|        1|      0|2022-01-03T12:05:...|    21000000| Bitcoin|            8972|                null|{{9.9427790254864...| bitcoin|   BTC|[mineable, pow, s...|         1.8918687E7|
#|    1.190054996865E8|       2|2015-08-07T00:00:...|1027|        1|      0|2022-01-03T12:06:...|        null|Ethereum|            5377|                null|{{4.5577849537238...|ethereum|   ETH|[mineable, pow, s...|    1.190054996865E8|
#|  3.3485475547114E10|       6|2017-10-01T00:00:...|2010|        1|      0|2022-01-03T12:05:...| 45000000000| Cardano|             348|                null|{{6.161144465755E...| cardano|   ADA|[mineable, dpos, ...|  3.3927753982173E10|
#|     4.7535964473E10|       8|2013-08-04T00:00:...|  52|        1|      0|2022-01-03T12:05:...|100000000000|     XRP|             672|                null|{{8.448884837941E...|     xrp|   XRP|[medium-of-exchan...|     9.9989907034E10|
#| 3.092842565984485E8|       5|2020-04-10T00:00:...|5426|        1|      0|2022-01-03T12:05:...|        null|  Solana|             217|                null|{{8.937421274506E...|  solana|   SOL|[pos, platform, s...|  5.11616946142289E8|
#|7.833788250671762E10|       4|2015-02-25T00:00:...| 825|        1|      0|2022-01-03T12:06:...|        null|  Tether|           25203|{1027, Ethereum, ...|{{8.276917824101E...|  tether|  USDT|[payments, stable...|8.275899187310669E10|
#+--------------------+--------+--------------------+----+---------+-------+--------------------+------------+--------+----------------+--------------------+--------------------+--------+------+--------------------+--------------------+