如何在从 Greenplum 上的 table 读取数据时在 Spark-jdbc 应用程序的选项 "dbtable" 中指定子查询?

How to specify subquery in the option "dbtable" in Spark-jdbc application while reading data from a table on Greenplum?

我正在尝试使用 Spark 从 Greenplum 上的 table 将数据读入 HDFS。我在选项中给出了一个子查询来读取 greenplum table,如下所示。

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from dbanscience.xx_lines where year=2017 and month=12) as xx_lines_periodYear"

println("ExecQuery: " + execQuery)

val dataDF = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").option("url", conUrl)
                     .option("dbtable", execQuery)
                     .option("user", devUsrName).option("password", devPwd)
                     .option("partitionColumn","id")
                     .option("lowerBound", 165512)
                     .option("upperBound", 11521481695656862L)
                     .option("numPartitions",300).load()

当我 运行 代码时,我看到以下异常:

Exec query: (select je_header_id,source_system_name,je_line_num,last_update_date,last_updated_by,last_updated_by_name,ledger_id,code_combination_id,balancing_segment,cost_center_segment,period_name,period_year,period_num,effective_date,status,creation_date,created_by,created_by_name,entered_dr,entered_cr,entered_amount,accounted_dr,accounted_cr,accounted_amount,description,sap_document_number,sap_fiscal_year,sap_document_date,sap_posting_date,sap_period,sap_reference,sap_document_header_text,sap_exchange_rate,sap_reference_key,sap_line_item,sap_account_number,sap_cost_element,sap_profit_center,sap_segment,sap_trading_partner,sap_co_order,sap_geography,sap_reference_code,sap_product_line,sap_sender_cost_center,usd_mor_activity_amount::character varying as usd_mor_activity_amount_text, 0 as del_flag from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: relation "public.(select je_header_id,source_system_name,je_line_num,last_update" does not exist
Position: 15
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:421)
at org.postgresql.jdbc.PgStatement.executeWithFlags(PgStatement.java:318)
at org.postgresql.jdbc.PgStatement.executeQuery(PgStatement.java:281)
at com.zaxxer.hikari.pool.ProxyStatement.executeQuery(ProxyStatement.java:111)
at com.zaxxer.hikari.pool.HikariProxyStatement.executeQuery(HikariProxyStatement.java)
at io.pivotal.greenplum.spark.jdbc.Jdbc$.resolveTable(Jdbc.scala:301)
at io.pivotal.greenplum.spark.GreenplumRelationProvider.createRelation(GreenplumRelationProvider.scala:29)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
at com.partition.source.chunk$.prepareDF(chunk.scala:153)
at com.partition.source.chunk$.main(chunk.scala:184)
at com.partition.source.chunk.main(chunk.scala)

异常显示:public 作为 dbname,子查询 (execQuery) 作为 tablename

我尝试将 exec 查询作为:

val execQuery = s"(select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12) as xx_gl_je_lines_periodYear"

val execQuery = s"select ${allColumns}, 0 as ${flagCol} from analytics.xx_gl_je_lines where period_year=2017 and period_num=12 as xx_gl_je_lines_periodYear"

None 他们正在工作。我正在使用 jar: greenplum-spark_2.11-1.4.0.jar 从 greenplum 读取数据。 下面是我尝试使用的 spark-submit:

SPARK_MAJOR_VERSION=2 spark-submit --class com.partition.source.chunk --master=yarn --conf spark.ui.port=4090 --driver-class-path /home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --conf spark.jars=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar --executor-cores 3 --executor-memory 13G --keytab /home/ibusr/ibusr.keytab --principal ibusr@dev.COM --files /usr/hdp/current/spark2-client/conf/hive-site.xml,connections.properties --name Splinter --conf spark.executor.extraClassPath=/home/ibusr/jars/greenplum-spark_2.11-1.4.0.jar splinter_2.11-0.1.jar

我参考了greenplumn文档中的说明编写了代码:https://greenplum-spark.docs.pivotal.io/100/read_from_gpdb.html

我无法确定我在这里犯的错误。谁能告诉我如何解决这个问题?

用子查询替换 dbtable 的选项是内置 JDBC 数据源的一项功能。然而,Greenplum Spark Connector 似乎并没有提供这样的功能。

具体来说,来源由 dbschemadbtable 标识,其中 the latter one should be(强调我的):

The name of the Greenplum Database table. When reading from Greenplum Database, this table must reside in the Greenplum Database schema identified in the dbschema option value.

这解释了您遇到的异常。

同时,您共享的代码中没有任何内容表明您确实需要此功能。由于您不应用任何特定于数据库的逻辑,因此该过程可能会被简单地重写为

import org.apache.spark.sql.functions.{col, lit}

val allColumns: Seq[String] = ???

val dataDF = spark.read.format("greenplum")
  .option("url", conUrl)
  .option("dbtable", "xx_lines")
  .option("dbschema", "dbanscience")
  .option("partitionColumn", "id")
  .option("user", devUsrName)
  .option("password", devPwd)
  .load()
  .where("year = 2017 and month=12")
  .select(allColumns map col:_*)
  .withColumn(flagCol, lit(0))

请注意您使用的其他选项(upperBoundlowerBoundnumPartitionsare neither supported 也不是必需的。

根据官方文档:

Greenplum Database stores table data across segments. A Spark application using the Greenplum-Spark Connector to load a Greenplum Database table identifies a specific table column as a partition column. The Connector uses the data values in this column to assign specific table data rows on each Greenplum Database segment to one or more Spark partitions.

因此,如您所见,分发机制与内置 JDBC 源完全不同。

连接器还提供额外的 partitionsPerSegment option which sets:

The number of Spark partitions per Greenplum Database segment. Optional, the default value is 1 partition.