在 Databricks 上的 Apache Spark 中读取 SQLite 数据库:不支持类型 NULL

Reading SQLite database in Apache Spark on Databricks: Unsupported Type NULL

我有一个 SQLite 数据库,我想将其导入到 DataBricks 上的 Spark 中。

当我 运行 下面的命令时,我在该命令下面得到错误。

df = spark.read.format('jdbc') \
          .options(driver='org.sqlite.JDBC', dbtable='issn',
                   url='jdbc:sqlite:/dbfs/mnt/the_path/test.sqlite').load()
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-4113757> in <module>
      6 df = spark.read.format('jdbc') \
      7         .options(driver='org.sqlite.JDBC', dbtable='issn',
----> 8                  url='jdbc:sqlite:/dbfs/mnt/if_i_told_you_the_path_i_would_have_to_kill_you/lanabug.sqlite').load()

/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    182             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    183         else:
--> 184             return self._df(self._jreader.load())
    185 
    186     @since(1.4)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    125     def deco(*a, **kw):
    126         try:
--> 127             return f(*a, **kw)
    128         except py4j.protocol.Py4JJavaError as e:
    129             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o4773.load.
: java.sql.SQLException: Unsupported type NULL
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:256)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema(JdbcUtils.scala:321)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:373)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
    at sun.reflect.GeneratedMethodAccessor1612.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

有没有办法推断出正确的架构,或者指定正确的架构 schema = StructType([StructField('modified_issn', IntegerType()), StructField('codes', StringType())])?我尝试了明显的假设选项,例如 inferSchema='true'schema=schemasqliteSchema=schemajdbcSchema=schema 和 none,它们都有效。请注意,我不是管理员,无法以任何方式重新配置系统。

MWE,重现步骤:

  1. 导入以下代码,运行spawn_and_test_cache('test.sqlite')创建数据库

  2. 运行 我之前提供的命令,替换为适当的路径。

import sqlite3


def convert_issn(issn):
    if issn[-1] != 'X':
        return int(issn)
    else:
        return 1000000000 + int(issn.replace('X', '0'))


def read_cache(isxn, filename='.sdxml.sqlite', create=False, mode='issn'):
    c = sqlite3.connect(filename)
    if mode == 'issn':
        modified_isxn = convert_issn(isxn)
    else:
        modified_isxn = int(isxn)
    sql = 'PRAGMA journal_mode=WAL;'
    c.execute(sql)
    if create:
        sql = ('CREATE TABLE IF NOT EXISTS'
               ' %s(modified_%s INTEGER PRIMARY KEY, codes STRING)'
               % (mode, mode))
        c.execute(sql)
        while True:
            try:
                c.commit()
                break
            except sqlite3.OperationalError:
                time.sleep(0.001)
            except:
                break
    curs = c.cursor()
    curs.execute('SELECT codes FROM %s WHERE modified_%s = %d'
                 % (mode, mode, modified_isxn))
    row = curs.fetchone()
    if row is None:
        c.close()
        return None
    c.close()
    return row[0]


def write_cache(isxn, codes, filename='.sdxml.sqlite', create=False,
                mode='issn'):
    c = sqlite3.connect(filename)
    if mode == 'issn':
        modified_isxn = convert_issn(isxn)
    else:
        modified_isxn = int(isxn)
    sql = 'PRAGMA journal_mode=WAL;'
    c.execute(sql)
    if create:
        sql = ('CREATE TABLE IF NOT EXISTS'
               ' %s(modified_%s INTEGER PRIMARY KEY, codes STRING);'
               % (mode, mode))
        c.execute(sql)
        while True:
            try:
                c.commit()
                break
            except sqlite3.OperationalError:
                time.sleep(0.0001)
            except Exception:
                break
    sql = ('INSERT OR REPLACE INTO %s values (%d, "%s")'
           % (mode, modified_isxn, codes))
    c.execute(sql)
    while True:
        try:
            c.commit()
            break
        except sqlite3.OperationalError:
            time.sleep(0.0001)
        except Exception:
            c.close()
            return False
    c.close()
    return True


def spawn_and_test_cache(filename='.sdxml.sqlite'):
    write_cache('10000000000000', 'abc', filename=filename, create=True)
    write_cache('10000000000000', 'def', filename=filename, create=True,
                mode='isbn')
    return (read_cache('10000000000000', filename=filename),
            read_cache('10000000000000', filename=filename, mode='isbn'))```

对于 SQLite tables 的定义,使用 TEXT(而不是不受支持的 STRING)-
你很好 :-)

https://www.sqlite.org/datatype3.html

(spark.read.format('jdbc')
 .options(driver='org.sqlite.JDBC', dbtable='sqlite_master',url='jdbc:sqlite:sdxml.sqlite').load()
 .show(truncate=False)
)

+-----+----+--------+--------+----------------------------------------------------------------+
|type |name|tbl_name|rootpage|sql                                                             |
+-----+----+--------+--------+----------------------------------------------------------------+
|table|issn|issn    |2       |CREATE TABLE issn(modified_issn INTEGER PRIMARY KEY, codes TEXT)|
|table|isbn|isbn    |3       |CREATE TABLE isbn(modified_isbn INTEGER PRIMARY KEY, codes TEXT)|
+-----+----+--------+--------+----------------------------------------------------------------+

(spark.read.format('jdbc')
 .options(driver='org.sqlite.JDBC', dbtable='issn',url='jdbc:sqlite:sdxml.sqlite').load()
 .show()
)

+--------------+-----+
| modified_issn|codes|
+--------------+-----+
|10000000000000|  abc|
+--------------+-----+

(spark.read.format('jdbc')
 .options(driver='org.sqlite.JDBC', dbtable='isbn',url='jdbc:sqlite:sdxml.sqlite').load()
 .show()
)


+--------------+-----+
| modified_isbn|codes|
+--------------+-----+
|10000000000000|  def|
+--------------+-----+

P.S.
customSchema 确实控制了模式,但是它不能防止源头上未知类型的错误。
我猜测 JDBC 驱动程序具有数据类型的映射 table,一旦找不到源的数据类型,它就会抛出异常。

driver = 'org.sqlite.JDBC'
query = 'select modified_issn from issn'
url = 'jdbc:sqlite:sdxml.sqlite'

data_types = ['tinyint','smallint','int','bigint','string']

for dt in data_types:
  spark.read.format('jdbc').options(customSchema = f'modified_issn {dt}'  ,driver=driver ,query=query ,url=url).load().printSchema()
  

root
 |-- modified_issn: byte (nullable = true)

root
 |-- modified_issn: short (nullable = true)

root
 |-- modified_issn: integer (nullable = true)

root
 |-- modified_issn: long (nullable = true)

root
 |-- modified_issn: string (nullable = true)