如何使用Google Cloud Storage作为Delta Lake的存储层?

How to use Google Cloud Storage as a storage layer of Delta Lake?

我可以使用 Google Cloud Storage 作为 Delta Lake 的存储层吗?


发现于 slack

在 Delta Lake 0.5.0 及之前的版本中是不可能的。

https://github.com/delta-io/delta/issues/294 上有一个问题需要跟踪。请随意投票以帮助优先考虑它。


就在 Google 发布后一天 Getting started with new table formats on Dataproc:

We’re announcing that table format projects Delta Lake and Apache Iceberg (Incubating) are now available in the latest version of Cloud Dataproc (version 1.5 Preview). You can start using them today with either Spark or Presto. Apache Hudi is also available on Dataproc 1.3.

有可能。 这是示例代码和您需要的库:

确保首先设置您的凭据,您可以是代码的一部分或作为环境:

export GOOGLE_APPLICATION_CREDENTIALS={gcs-key-path.json}
import org.apache.spark.sql.{SparkSession, DataFrame}
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryOptions
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.DatasetInfo

spark.conf.set("parentProject", {Proj})
spark.conf.set("spark.hadoop.fs.gs.auth.service.account.enable", "true")   
spark.conf.set("spark.hadoop.fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")
spark.conf.set("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")
spark.conf.set("spark.delta.logStore.gs.impl", "io.delta.storage.GCSLogStore")
spark.conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")


val targetTablePath = "gs://{bucket}/{dataset}/{tablename}"
spark.range(5, 10).write.format("delta")
      .mode("overwrite")
      .save(targetTablePath)

您需要的库:

"io.delta" % "delta-core_2.12" % "1.0.0",
"io.delta" % "delta-contribs_2.12" % "1.0.0",
"com.google.cloud.spark" % "spark-bigquery-with-dependencies_2.12" % "0.21.1",
"com.google.cloud.bigdataoss" % "gcs-connector" % "1.9.4-hadoop3"

正在 GCS 中检查我的增量文件:

$ gsutil ls gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3
gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3/
gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3/part-00000-ce79bfc7-e28f-4929-955c-56a7a08caf9f-c000.snappy.parquet
gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3/part-00001-dda0bd2d-a081-4444-8983-ac8f3a2ffe9d-c000.snappy.parquet
gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3/part-00002-93f7429b-777a-42f4-b2dd-adc9a482a6e8-c000.snappy.parquet
gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3/part-00003-e9874baf-6c0b-46de-891e-032ac8b67287-c000.snappy.parquet
gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3/part-00004-ede54816-2da1-412f-a9e3-5233e77258fb-c000.snappy.parquet
gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3/_delta_log/
gs://r-dps-datapipeline-dev/testoliver/oliver_sample_delta3/_symlink_format_manifest/