PySpark 使用 JDBC 连接器写入 Trino

PySpark write to Trino with JDBC connector

我正在尝试使用带有 PySpark 的 JDBC 连接器从 Trino 中的 table 读取数据,但是,每当我尝试写入字符串时,我总是收到错误 "Unknown type 'TEXT' for column X" /VARCHAR 列。例如,它适用于 double。

我使用的代码如下:

from pyspark.sql import SparkSession
from pyspark import SparkContext

sc = SparkContext("local", "Test Write")

spark = SparkSession\
    .builder\
    .config("spark.jars", "trino-jdbc-379.jar")\
    .master("local")\
    .appName("Test Write")\
    .getOrCreate()


query_test = """( SELECT CAST(name AS VARCHAR(200)) AS brandname
                    FROM dbname.pdo.brand) brand """
test_df = spark \
    .read\
    .format("jdbc")\
    .option("url", "jdbc:trino://host:443")\
    .option("driver", "io.trino.jdbc.TrinoDriver")\
    .option("dbtable", query_test)\
    .option("user", "user")\
    .option("password", "pass")\
    .load()

test_df.printSchema()

test_df\
    .write\
    .format("jdbc")\
    .option("url", "jdbc:trino://host:443")\
    .option("dbtable", "dbname.sandbox.test")\
    .option("isolationLevel","NONE")\
    .option("user", "user")\
    .option("password", "pass")\
    .mode("overwrite")\
    .save()

我得到的完整错误是这样的:

Traceback (most recent call last):
  File "/test_write.py", line 28, in <module>
    test_df\
  File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/readwriter.py", line 828, in save
    self._jwrite.save()
  File "/home/user/.local/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/home/user/.local/lib/python3.8/site-packages/pyspark/sql/utils.py", line 128, in deco
    return f(*a, **kw)
  File "/home/user/.local/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o51.save.
: java.sql.SQLException: Query failed (#20220509_144401_00574_zadvu): line 1:36: Unknown type 'TEXT' for column '"brandname"'
        at io.trino.jdbc.AbstractTrinoResultSet.resultsException(AbstractTrinoResultSet.java:1908)
        at io.trino.jdbc.TrinoResultSet.getColumns(TrinoResultSet.java:285)
        at io.trino.jdbc.TrinoResultSet.create(TrinoResultSet.java:61)
        at io.trino.jdbc.TrinoStatement.internalExecute(TrinoStatement.java:262)
        at io.trino.jdbc.TrinoStatement.execute(TrinoStatement.java:240)
        at io.trino.jdbc.TrinoStatement.executeLargeUpdate(TrinoStatement.java:485)
        at io.trino.jdbc.TrinoStatement.executeUpdate(TrinoStatement.java:457)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createTable(JdbcUtils.scala:894)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:81)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute(SparkPlan.scala:180)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery(SparkPlan.scala:218)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:127)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:126)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand(DataFrameWriter.scala:962)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:100)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:962)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:414)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:398)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: io.trino.spi.TrinoException: line 1:36: Unknown type 'TEXT' for column '"brandname"'
        at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:48)
        at io.trino.sql.analyzer.SemanticExceptions.semanticException(SemanticExceptions.java:43)
        at io.trino.execution.CreateTableTask.internalExecute(CreateTableTask.java:160)
        at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:119)
        at io.trino.execution.CreateTableTask.execute(CreateTableTask.java:85)
        at io.trino.execution.DataDefinitionExecution.start(DataDefinitionExecution.java:145)
        at io.trino.execution.SqlQueryManager.createQuery(SqlQueryManager.java:243)
        at io.trino.dispatcher.LocalDispatchQuery.lambda$startExecution(LocalDispatchQuery.java:143)
        at io.trino.$gen.Trino_375____20220508_190617_2.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

我知道该字段的格式正确,因为 test_df.printSchema() 的结果是这样的:

root
 |-- brandname: string (nullable = true)

而且我也可以使用 test_df.show() 打印它而不会出现问题。

关于如何解决这个问题有什么想法吗?我正在使用 PySpark==3.0.3,因为 3.1.* 之后的版本存在一个已知问题,带有 JDBC 连接器,它仅在 3.3.0 上得到修复,但尚未在 stable 中发布版本。

将此选项添加到您的写作中:

.option("createTableColumnTypes", "brandname VARCHAR(200)")

导致此问题的兔子洞的开始可能是答案: and here