如何使用 Spark 数据集写入 PostgreSQL hstore
How to write into PostgreSQL hstore using Spark Dataset
我正在尝试将 Spark 数据集写入现有的 postgresql table(无法更改列类型等 table 元数据)。此 table 的其中一列的类型为 HStore,它正在引起麻烦。
我在启动写入时看到以下异常(此处原始映射为空,转义时给出空字符串):
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted. Call getNextException to see the cause.
at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136)
at org.postgresql.core.v3.QueryExecutorImpl.handleError(QueryExecutorImpl.java:419)
at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004)
at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187)
at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:300)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:299)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:902)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying
我就是这样做的:
def escapePgHstore[A, B](hmap: Map[A, B]) = {
hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",")
}
...
val props = new Properties()
props.put("user", "xxxxxxx")
props.put("password", "xxxxxxx")
ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column"))
.drop("original_column")
.coalesce(1).write
.mode(org.apache.spark.sql.SaveMode.Append)
.option("driver", "org.postgresql.Driver")
.jdbc(jdbcUrl, hashedTablePartName, props)
如果我不使用 escapePgHstore
将 original_column
从 Map[String, Long] 转义为 String,我会看到以下错误:
java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint>
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType.apply(JdbcUtils.scala:137)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType.apply(JdbcUtils.scala:137)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun.apply(JdbcUtils.scala:293)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun.apply(JdbcUtils.scala:292)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
让 spark 写入有效的 hstore 数据类型的正确方法是什么?
原来我只需要让 postgres 尝试猜测我的专栏的适当类型。通过在 official documentation.
中描述的连接字符串中将 stringtype
设置为 unspecified
props.put("stringtype", "unspecified")
现在完美运行了!!
这是一个 pyspark 代码,用于将数据帧写入具有 HSTORE JSON 和 JSONB 列的 Postgres Table。因此,一般来说,对于在 Postgres 中创建但无法在 Spark Dataframe 中创建的任何复杂数据类型,您需要在选项中指定 stringtype="unspecified"
或在您设置为任何写入数据帧的属性中指定 SQL 函数。
下面是使用 write()
函数将 Spark Dataframe 写入 PostgreSQL table 的示例:
dataframe.write.format('jdbc').options(driver=driver,user=username,password=password, url=target_database_url,dbtable=table, stringtype="unspecified").mode("append").save()
我正在尝试将 Spark 数据集写入现有的 postgresql table(无法更改列类型等 table 元数据)。此 table 的其中一列的类型为 HStore,它正在引起麻烦。
我在启动写入时看到以下异常(此处原始映射为空,转义时给出空字符串):
Caused by: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO part_d3da09549b713bbdcd95eb6095f929c8 (.., "my_hstore_column", ..) VALUES (..,'',..) was aborted. Call getNextException to see the cause.
at org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:136)
at org.postgresql.core.v3.QueryExecutorImpl.handleError(QueryExecutorImpl.java:419)
at org.postgresql.core.v3.QueryExecutorImpl$ErrorTrackingResultHandler.handleError(QueryExecutorImpl.java:308)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2004)
at org.postgresql.core.v3.QueryExecutorImpl.flushIfDeadlockRisk(QueryExecutorImpl.java:1187)
at org.postgresql.core.v3.QueryExecutorImpl.sendQuery(QueryExecutorImpl.java:1212)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:351)
at org.postgresql.jdbc.PgStatement.executeBatch(PgStatement.java:1019)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:222)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:300)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$saveTable.apply(JdbcUtils.scala:299)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:902)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$$anonfun$apply.apply(RDD.scala:902)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1899)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.postgresql.util.PSQLException: ERROR: column "my_hstore_column" is of type hstore but expression is of type character varying
我就是这样做的:
def escapePgHstore[A, B](hmap: Map[A, B]) = {
hmap.map{case(key, value) => s""" "${key}"=>${value} """}.mkString(",")
}
...
val props = new Properties()
props.put("user", "xxxxxxx")
props.put("password", "xxxxxxx")
ds.withColumn("my_hstore_column", escape_pg_hstore_udf($"original_column"))
.drop("original_column")
.coalesce(1).write
.mode(org.apache.spark.sql.SaveMode.Append)
.option("driver", "org.postgresql.Driver")
.jdbc(jdbcUrl, hashedTablePartName, props)
如果我不使用 escapePgHstore
将 original_column
从 Map[String, Long] 转义为 String,我会看到以下错误:
java.lang.IllegalArgumentException: Can't get JDBC type for map<string,bigint>
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType.apply(JdbcUtils.scala:137)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType.apply(JdbcUtils.scala:137)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getJdbcType(JdbcUtils.scala:136)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun.apply(JdbcUtils.scala:293)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun.apply(JdbcUtils.scala:292)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.saveTable(JdbcUtils.scala:292)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:441)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.App$$anonfun$main.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
让 spark 写入有效的 hstore 数据类型的正确方法是什么?
原来我只需要让 postgres 尝试猜测我的专栏的适当类型。通过在 official documentation.
中描述的连接字符串中将stringtype
设置为 unspecified
props.put("stringtype", "unspecified")
现在完美运行了!!
这是一个 pyspark 代码,用于将数据帧写入具有 HSTORE JSON 和 JSONB 列的 Postgres Table。因此,一般来说,对于在 Postgres 中创建但无法在 Spark Dataframe 中创建的任何复杂数据类型,您需要在选项中指定 stringtype="unspecified"
或在您设置为任何写入数据帧的属性中指定 SQL 函数。
下面是使用 write()
函数将 Spark Dataframe 写入 PostgreSQL table 的示例:
dataframe.write.format('jdbc').options(driver=driver,user=username,password=password, url=target_database_url,dbtable=table, stringtype="unspecified").mode("append").save()