PySpark 使用 JDBC 连接器写入 Trino
PySpark write to Trino with JDBC connector
我正在尝试使用带有 PySpark 的 JDBC 连接器从 Trino 中的 table 读取数据,但是,每当我尝试写入字符串时,我总是收到错误 "Unknown type 'TEXT' for column X"
/VARCHAR 列。例如,它适用于 double。
我使用的代码如下:
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext("local", "Test Write")
spark = SparkSession\
.builder\
.config("spark.jars", "trino-jdbc-379.jar")\
.master("local")\
.appName("Test Write")\
.getOrCreate()
query_test = """( SELECT CAST(name AS VARCHAR(200)) AS brandname
FROM dbname.pdo.brand) brand """
test_df = spark \
.read\
.format("jdbc")\
.option("url", "jdbc:trino://host:443")\
.option("driver", "io.trino.jdbc.TrinoDriver")\
.option("dbtable", query_test)\
.option("user", "user")\
.option("password", "pass")\
.load()
test_df.printSchema()
test_df\
.write\
.format("jdbc")\
.option("url", "jdbc:trino://host:443")\
.option("dbtable", "dbname.sandbox.test")\
.option("isolationLevel","NONE")\
.option("user", "user")\
.option("password", "pass")\
.mode("overwrite")\
.save()
我得到的完整错误是这样的:
Traceback (most recent call last):
File "/test_write.py", line 28, in <module>
test_df\
File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 828, in save
self._jwrite.save()
File "/home/user/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 128, in deco
return f(*a, **kw)
File "/home/user/.local/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o51.save.
: java.sql.SQLException: Query failed (#20220509_144401_00574_zadvu): line 1:36: Unknown type 'TEXT' for column '"brandname"'
at io.trino.jdbc.AbstractTrinoResultSet.resultsException(AbstractTrinoResultSet.java:1908)
at io.trino.jdbc.TrinoResultSet.getColumns(TrinoResultSet.java:285)
at io.trino.jdbc.TrinoResultSet.create(TrinoResultSet.java:61)
at io.trino.jdbc.TrinoStatement.internalExecute(TrinoStatement.java:262)
at io.trino.jdbc.TrinoStatement.execute(TrinoStatement.java:240)
at io.trino.jdbc.TrinoStatement.executeLargeUpdate(TrinoStatement.java:485)
at io.trino.jdbc.TrinoStatement.executeUpdate(TrinoStatement.java:457)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:894)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:962)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.trino.spi.TrinoException: line 1:36: Unknown type 'TEXT' for column '"brandname"'
at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:48)
at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:43)
at io.trino.execution.CreateTableTask.internalExecute(CreateTableTask.java:160)
at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:119)
at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:85)
at io.trino.execution.DataDefinitionExecution.start(DataDefinitionExecution.java:145)
at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:243)
at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution(LocalDispatchQuery.java:143)
at io.trino.$gen.Trino_375____20220508_190617_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我知道该字段的格式正确,因为 test_df.printSchema()
的结果是这样的:
root
|-- brandname: string (nullable = true)
而且我也可以使用 test_df.show()
打印它而不会出现问题。
关于如何解决这个问题有什么想法吗?我正在使用 PySpark==3.0.3,因为 3.1.* 之后的版本存在一个已知问题,带有 JDBC 连接器,它仅在 3.3.0 上得到修复,但尚未在 stable 中发布版本。
将此选项添加到您的写作中:
.option("createTableColumnTypes", "brandname VARCHAR(200)")
导致此问题的兔子洞的开始可能是答案: and here
我正在尝试使用带有 PySpark 的 JDBC 连接器从 Trino 中的 table 读取数据,但是,每当我尝试写入字符串时,我总是收到错误 "Unknown type 'TEXT' for column X"
/VARCHAR 列。例如,它适用于 double。
我使用的代码如下:
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext("local", "Test Write")
spark = SparkSession\
.builder\
.config("spark.jars", "trino-jdbc-379.jar")\
.master("local")\
.appName("Test Write")\
.getOrCreate()
query_test = """( SELECT CAST(name AS VARCHAR(200)) AS brandname
FROM dbname.pdo.brand) brand """
test_df = spark \
.read\
.format("jdbc")\
.option("url", "jdbc:trino://host:443")\
.option("driver", "io.trino.jdbc.TrinoDriver")\
.option("dbtable", query_test)\
.option("user", "user")\
.option("password", "pass")\
.load()
test_df.printSchema()
test_df\
.write\
.format("jdbc")\
.option("url", "jdbc:trino://host:443")\
.option("dbtable", "dbname.sandbox.test")\
.option("isolationLevel","NONE")\
.option("user", "user")\
.option("password", "pass")\
.mode("overwrite")\
.save()
我得到的完整错误是这样的:
Traceback (most recent call last):
File "/test_write.py", line 28, in <module>
test_df\
File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 828, in save
self._jwrite.save()
File "/home/user/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 128, in deco
return f(*a, **kw)
File "/home/user/.local/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o51.save.
: java.sql.SQLException: Query failed (#20220509_144401_00574_zadvu): line 1:36: Unknown type 'TEXT' for column '"brandname"'
at io.trino.jdbc.AbstractTrinoResultSet.resultsException(AbstractTrinoResultSet.java:1908)
at io.trino.jdbc.TrinoResultSet.getColumns(TrinoResultSet.java:285)
at io.trino.jdbc.TrinoResultSet.create(TrinoResultSet.java:61)
at io.trino.jdbc.TrinoStatement.internalExecute(TrinoStatement.java:262)
at io.trino.jdbc.TrinoStatement.execute(TrinoStatement.java:240)
at io.trino.jdbc.TrinoStatement.executeLargeUpdate(TrinoStatement.java:485)
at io.trino.jdbc.TrinoStatement.executeUpdate(TrinoStatement.java:457)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:894)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:962)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.trino.spi.TrinoException: line 1:36: Unknown type 'TEXT' for column '"brandname"'
at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:48)
at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:43)
at io.trino.execution.CreateTableTask.internalExecute(CreateTableTask.java:160)
at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:119)
at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:85)
at io.trino.execution.DataDefinitionExecution.start(DataDefinitionExecution.java:145)
at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:243)
at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution(LocalDispatchQuery.java:143)
at io.trino.$gen.Trino_375____20220508_190617_2.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
我知道该字段的格式正确,因为 test_df.printSchema()
的结果是这样的:
root
|-- brandname: string (nullable = true)
而且我也可以使用 test_df.show()
打印它而不会出现问题。
关于如何解决这个问题有什么想法吗?我正在使用 PySpark==3.0.3,因为 3.1.* 之后的版本存在一个已知问题,带有 JDBC 连接器,它仅在 3.3.0 上得到修复,但尚未在 stable 中发布版本。
将此选项添加到您的写作中:
.option("createTableColumnTypes", "brandname VARCHAR(200)")
导致此问题的兔子洞的开始可能是答案: