使用 PySpark 从 Bigquery External Table 读取数据并创建 DataFrame

Reading data from Bigquery External Table using PySpark and create DataFrame

我在 GCS 中创建了一个 NEWLINE DELIM JSON 文件。我也在同一个 JSON 文件之上创建了一个外部 table,并且能够从 BigQuery UI.

读取数据

我想使用 PySpark 访问外部 table 数据并创建数据框,然后 运行 来自 Dataproc 的相同作业。下面是我写的代码片段:

#!/usr/bin/python
import sys
import json
from pyspark.sql.functions import udf, lit, when, date_sub
from pyspark.sql.types import ArrayType, IntegerType, StructType, StructField, StringType, BooleanType, DateType
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.sql import Row
from datetime import datetime

TargetTableUri=sys.argv[1]

spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-bigquery-demo') \
  .getOrCreate()


bucket1 = "gs://first-bucket-arpan/output1"
spark.conf.set('temporaryGcsBucket', bucket1)

src_tbl = spark.read.format('bigquery') \
  .option('table', 'turing-thought-277215:first_dataset.ext_employee_details') \
  .load()
src_tbl.createOrReplaceTempView('src_tbl')

src_tbl_df = spark.sql( 'SELECT EMPID,EMPNAME,STREETADRESS,REGION,STATE,COUNTRY FROM src_tbl' )
src_tbl_df.show()
src_tbl_df.print_schema()

运行从 dataproc 集群中执行作业时,出现以下错误: “:java.lang.UnsupportedOperationException:目前不支持table turing-thought-277215.first_dataset.ext_employee_details的类型:EXTERNAL”

PySpark 不支持 BigQuery External Table 用于 BigQuery 连接器吗? 以下是完整日志。

20/08/13 16:44:25 INFO org.spark_project.jetty.util.log: Logging initialized @4863ms
20/08/13 16:44:25 INFO org.spark_project.jetty.server.Server: jetty-9.3.z-SNAPSHOT, build timestamp: unknown, git hash: unknown
20/08/13 16:44:25 INFO org.spark_project.jetty.server.Server: Started @5045ms
20/08/13 16:44:25 INFO org.spark_project.jetty.server.AbstractConnector: Started ServerConnector@5cf22b28{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
20/08/13 16:44:25 WARN org.apache.spark.scheduler.FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration.
20/08/13 16:44:27 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at my-dataproc-cluster-m/10.148.0.40:8032
20/08/13 16:44:27 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at my-dataproc-cluster-m/10.148.0.40:10200
20/08/13 16:44:31 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1596957621647_0022
Traceback (most recent call last):
  File "/tmp/job-scd2curation-6/scdtype2curation.py", line 25, in <module>
    .option('table', 'turing-thought-277215:first_dataset.ext_employee_details') \
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o59.load.

: java.lang.UnsupportedOperationException: The type of table turing-thought-277215.first_dataset.ext_employee_details is currently not supported: EXTERNAL

    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelationInternal(BigQueryRelationProvider.scala:83)
    at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:40)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

只支持tableview,不支持externalSee the source

    val table = Option(bigquery.getTable(opts.tableId))
      .getOrElse(sys.error(s"Table $tableName not found"))
    table.getDefinition[TableDefinition].getType match {
      case TABLE => new DirectBigQueryRelation(opts, table)(sqlContext)
      case VIEW | MATERIALIZED_VIEW => if (opts.viewsEnabled) {
        new DirectBigQueryRelation(opts, table)(sqlContext)
      } else {
        sys.error(
          s"""Views were not enabled. You can enable views by setting
             |'${SparkBigQueryOptions.ViewsEnabledOption}' to true.
             |Notice additional cost may occur."""
            .stripMargin.replace('\n', ' '))
      }
      case unsupported => throw new UnsupportedOperationException(
        s"The type of table $tableName is currently not supported: $unsupported")
    }