使用 Databricks 将 Google Api 的结果写入数据湖

Write the results of the Google Api to a data lake with Databricks

我正在通过 Databricks 上的 Python SDK 从 Google 管理员报告用户使用情况 Api 取回用户使用数据。数据大小约为每天 100,000 条记录,我通过批处理在晚上完成。 api returns 最大页面大小为 1000,因此我粗略地称其为 1000 以获得当天所需的数据。这工作正常。

我的最终目标是以原始格式将数据存储在数据湖中(Azure Gen2,但与此问题无关)。稍后,我将使用 Databricks 将数据转换为聚合报告模型,并将 PowerBI 置于其之上以跟踪 Google 应用程序随时间的使用情况。

作为一名 C# 程序员,我是 Python 和 Spark 的新手:我目前的方法是从 api 请求 1000 条记录的第一页,然后将其直接写入数据湖一个 JSON 文件,然后获取下一个页面集并将其写入。文件夹结构类似于“\raw\googleuser\YYYY\MM\DD\data1.json”.

我想在原始区域中尽可能以最原始的形式保存数据,并且不应用太多转换。第二个过程可以提取我需要的字段,用元数据标记它并将它写回 Parquet 以供函数使用。这就是为什么我想把它写成 JSON。

这意味着第二个进程需要将 JSON 读入一个数据帧,我可以在其中转换它并将其写为镶木地板(这部分也很简单)。

因为我正在使用 Google Api 我没有使用 Json - 它 returns dict 对象(具有复杂的嵌套)。我可以使用 json.dump() 将其提取为 Json 字符串,但我不知道如何将 STRING 直接写入我的数据湖。一旦我将它放入数据帧中,我就可以轻松地以任何格式编写它,但是将它从 Json 转换为数据帧然后基本上返回 Json 只是为了写入它似乎是一种性能开销。

以下是我尝试过的方法和结果:

  1. 构建一个 pyspark.sql.Rows 的列表,并在所有分页结束时(100k 行)- 使用 spark.createDataFrame(rows) 将其转换为数据框。一旦它是一个数据框,我就可以将它保存为 Json 文件。这有效,但似乎效率低下。
  2. 使用json.dump(请求)获取Json中1000条记录的字符串。我可以使用以下代码将其写入 Databricks 文件系统:

    with open("/dbfs/tmp/googleuserusagejsonoutput-{0}.json" .format(keyDateFilter), 'w') as f: f.write(json.dumps(response))

    但是,我必须将它移动到我的 Azure 数据湖:

    dbutils.fs.cp("/tmp/test_dbfs1.txt", datalake_path + dbfs_path + "xyz.json")

    然后我得到接下来的 1000 条记录并继续这样做。我似乎无法将 open() 方法目录用于数据湖存储(Azure abfss 驱动程序),否则这将是一个不错的解决方案。先dump到本地再移动,显得脆弱又奇怪

  3. 与选项1相同,但每1000条记录将dataframe转储到datalake并覆盖它(这样内存不会一次增加超过1000条记录)

  4. 忽略转储raw的规则Json。将数据整理成我想要的最简单的格式,并删除我不需要的所有额外数据。这将导致占用空间小得多,然后将遵循上面的选项 1 或 3。 (这是第二个问题 - 以原始格式保存来自 Api 的所有数据的原则,以便随着时间的推移需求发生变化,我总是在数据湖中拥有历史数据,只需更改转换例程即可提取不同的指标。因此我不愿意在这个阶段放弃任何数据。

如有任何建议,请...

将 lake 安装到你的数据块环境中,这样你就可以将它保存到 lake 中,就好像它是一个普通文件夹一样:

with open('/dbfs/mnt/mydatalake/googleuserusagejsonoutput-{0}.json', 'wb') as f:
            json.dump(data, codecs.getwriter('utf-8')(f), sort_keys = True, indent = 4, ensure_ascii=False)
            f.close()

你只需要挂载一次湖:

https://docs.databricks.com/spark/latest/data-sources/azure/azure-datalake-gen2.html#mount-the-azure-data-lake-storage-gen2-filesystem-with-dbfs

也就是说,

以 json 格式存储大数据不是最佳选择;对于您存储键(列名)的每个值(单元格),因此您的数据将比需要的大得多。此外,您可能应该具有重复数据删除功能以确保两者(1)数据中没有间隙,以及(2)您没有将相同的数据存储在多个文件中。 Databricks delta 会处理这个问题。

https://docs.databricks.com/delta/delta-intro.html