在 Databricks 上的 Apache Spark 中读取 SQLite 数据库:不支持类型 NULL
Reading SQLite database in Apache Spark on Databricks: Unsupported Type NULL
我有一个 SQLite 数据库,我想将其导入到 DataBricks 上的 Spark 中。
当我 运行 下面的命令时,我在该命令下面得到错误。
df = spark.read.format('jdbc') \
.options(driver='org.sqlite.JDBC', dbtable='issn',
url='jdbc:sqlite:/dbfs/mnt/the_path/test.sqlite').load()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-4113757> in <module>
6 df = spark.read.format('jdbc') \
7 .options(driver='org.sqlite.JDBC', dbtable='issn',
----> 8 url='jdbc:sqlite:/dbfs/mnt/if_i_told_you_the_path_i_would_have_to_kill_you/lanabug.sqlite').load()
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
182 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
183 else:
--> 184 return self._df(self._jreader.load())
185
186 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
125 def deco(*a, **kw):
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o4773.load.
: java.sql.SQLException: Unsupported type NULL
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:256)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema(JdbcUtils.scala:321)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:373)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
at sun.reflect.GeneratedMethodAccessor1612.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
有没有办法推断出正确的架构,或者指定正确的架构 schema = StructType([StructField('modified_issn', IntegerType()), StructField('codes', StringType())])
?我尝试了明显的假设选项,例如 inferSchema='true'
、schema=schema
、sqliteSchema=schema
、jdbcSchema=schema
和 none,它们都有效。请注意,我不是管理员,无法以任何方式重新配置系统。
MWE,重现步骤:
导入以下代码,运行spawn_and_test_cache('test.sqlite')
创建数据库
运行 我之前提供的命令,替换为适当的路径。
import sqlite3
def convert_issn(issn):
if issn[-1] != 'X':
return int(issn)
else:
return 1000000000 + int(issn.replace('X', '0'))
def read_cache(isxn, filename='.sdxml.sqlite', create=False, mode='issn'):
c = sqlite3.connect(filename)
if mode == 'issn':
modified_isxn = convert_issn(isxn)
else:
modified_isxn = int(isxn)
sql = 'PRAGMA journal_mode=WAL;'
c.execute(sql)
if create:
sql = ('CREATE TABLE IF NOT EXISTS'
' %s(modified_%s INTEGER PRIMARY KEY, codes STRING)'
% (mode, mode))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.001)
except:
break
curs = c.cursor()
curs.execute('SELECT codes FROM %s WHERE modified_%s = %d'
% (mode, mode, modified_isxn))
row = curs.fetchone()
if row is None:
c.close()
return None
c.close()
return row[0]
def write_cache(isxn, codes, filename='.sdxml.sqlite', create=False,
mode='issn'):
c = sqlite3.connect(filename)
if mode == 'issn':
modified_isxn = convert_issn(isxn)
else:
modified_isxn = int(isxn)
sql = 'PRAGMA journal_mode=WAL;'
c.execute(sql)
if create:
sql = ('CREATE TABLE IF NOT EXISTS'
' %s(modified_%s INTEGER PRIMARY KEY, codes STRING);'
% (mode, mode))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.0001)
except Exception:
break
sql = ('INSERT OR REPLACE INTO %s values (%d, "%s")'
% (mode, modified_isxn, codes))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.0001)
except Exception:
c.close()
return False
c.close()
return True
def spawn_and_test_cache(filename='.sdxml.sqlite'):
write_cache('10000000000000', 'abc', filename=filename, create=True)
write_cache('10000000000000', 'def', filename=filename, create=True,
mode='isbn')
return (read_cache('10000000000000', filename=filename),
read_cache('10000000000000', filename=filename, mode='isbn'))```
对于 SQLite tables 的定义,使用 TEXT(而不是不受支持的 STRING)-
你很好 :-)
https://www.sqlite.org/datatype3.html
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='sqlite_master',url='jdbc:sqlite:sdxml.sqlite').load()
.show(truncate=False)
)
+-----+----+--------+--------+----------------------------------------------------------------+
|type |name|tbl_name|rootpage|sql |
+-----+----+--------+--------+----------------------------------------------------------------+
|table|issn|issn |2 |CREATE TABLE issn(modified_issn INTEGER PRIMARY KEY, codes TEXT)|
|table|isbn|isbn |3 |CREATE TABLE isbn(modified_isbn INTEGER PRIMARY KEY, codes TEXT)|
+-----+----+--------+--------+----------------------------------------------------------------+
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='issn',url='jdbc:sqlite:sdxml.sqlite').load()
.show()
)
+--------------+-----+
| modified_issn|codes|
+--------------+-----+
|10000000000000| abc|
+--------------+-----+
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='isbn',url='jdbc:sqlite:sdxml.sqlite').load()
.show()
)
+--------------+-----+
| modified_isbn|codes|
+--------------+-----+
|10000000000000| def|
+--------------+-----+
P.S.
customSchema
确实控制了模式,但是它不能防止源头上未知类型的错误。
我猜测 JDBC 驱动程序具有数据类型的映射 table,一旦找不到源的数据类型,它就会抛出异常。
driver = 'org.sqlite.JDBC'
query = 'select modified_issn from issn'
url = 'jdbc:sqlite:sdxml.sqlite'
data_types = ['tinyint','smallint','int','bigint','string']
for dt in data_types:
spark.read.format('jdbc').options(customSchema = f'modified_issn {dt}' ,driver=driver ,query=query ,url=url).load().printSchema()
root
|-- modified_issn: byte (nullable = true)
root
|-- modified_issn: short (nullable = true)
root
|-- modified_issn: integer (nullable = true)
root
|-- modified_issn: long (nullable = true)
root
|-- modified_issn: string (nullable = true)
我有一个 SQLite 数据库,我想将其导入到 DataBricks 上的 Spark 中。
当我 运行 下面的命令时,我在该命令下面得到错误。
df = spark.read.format('jdbc') \
.options(driver='org.sqlite.JDBC', dbtable='issn',
url='jdbc:sqlite:/dbfs/mnt/the_path/test.sqlite').load()
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-4113757> in <module>
6 df = spark.read.format('jdbc') \
7 .options(driver='org.sqlite.JDBC', dbtable='issn',
----> 8 url='jdbc:sqlite:/dbfs/mnt/if_i_told_you_the_path_i_would_have_to_kill_you/lanabug.sqlite').load()
/databricks/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
182 return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
183 else:
--> 184 return self._df(self._jreader.load())
185
186 @since(1.4)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
125 def deco(*a, **kw):
126 try:
--> 127 return f(*a, **kw)
128 except py4j.protocol.Py4JJavaError as e:
129 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o4773.load.
: java.sql.SQLException: Unsupported type NULL
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getCatalystType(JdbcUtils.scala:256)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$getSchema(JdbcUtils.scala:321)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:321)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:226)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:384)
at org.apache.spark.sql.DataFrameReader.$anonfun$load(DataFrameReader.scala:373)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:373)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:258)
at sun.reflect.GeneratedMethodAccessor1612.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
有没有办法推断出正确的架构,或者指定正确的架构 schema = StructType([StructField('modified_issn', IntegerType()), StructField('codes', StringType())])
?我尝试了明显的假设选项,例如 inferSchema='true'
、schema=schema
、sqliteSchema=schema
、jdbcSchema=schema
和 none,它们都有效。请注意,我不是管理员,无法以任何方式重新配置系统。
MWE,重现步骤:
导入以下代码,运行
spawn_and_test_cache('test.sqlite')
创建数据库运行 我之前提供的命令,替换为适当的路径。
import sqlite3
def convert_issn(issn):
if issn[-1] != 'X':
return int(issn)
else:
return 1000000000 + int(issn.replace('X', '0'))
def read_cache(isxn, filename='.sdxml.sqlite', create=False, mode='issn'):
c = sqlite3.connect(filename)
if mode == 'issn':
modified_isxn = convert_issn(isxn)
else:
modified_isxn = int(isxn)
sql = 'PRAGMA journal_mode=WAL;'
c.execute(sql)
if create:
sql = ('CREATE TABLE IF NOT EXISTS'
' %s(modified_%s INTEGER PRIMARY KEY, codes STRING)'
% (mode, mode))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.001)
except:
break
curs = c.cursor()
curs.execute('SELECT codes FROM %s WHERE modified_%s = %d'
% (mode, mode, modified_isxn))
row = curs.fetchone()
if row is None:
c.close()
return None
c.close()
return row[0]
def write_cache(isxn, codes, filename='.sdxml.sqlite', create=False,
mode='issn'):
c = sqlite3.connect(filename)
if mode == 'issn':
modified_isxn = convert_issn(isxn)
else:
modified_isxn = int(isxn)
sql = 'PRAGMA journal_mode=WAL;'
c.execute(sql)
if create:
sql = ('CREATE TABLE IF NOT EXISTS'
' %s(modified_%s INTEGER PRIMARY KEY, codes STRING);'
% (mode, mode))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.0001)
except Exception:
break
sql = ('INSERT OR REPLACE INTO %s values (%d, "%s")'
% (mode, modified_isxn, codes))
c.execute(sql)
while True:
try:
c.commit()
break
except sqlite3.OperationalError:
time.sleep(0.0001)
except Exception:
c.close()
return False
c.close()
return True
def spawn_and_test_cache(filename='.sdxml.sqlite'):
write_cache('10000000000000', 'abc', filename=filename, create=True)
write_cache('10000000000000', 'def', filename=filename, create=True,
mode='isbn')
return (read_cache('10000000000000', filename=filename),
read_cache('10000000000000', filename=filename, mode='isbn'))```
对于 SQLite tables 的定义,使用 TEXT(而不是不受支持的 STRING)-
你很好 :-)
https://www.sqlite.org/datatype3.html
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='sqlite_master',url='jdbc:sqlite:sdxml.sqlite').load()
.show(truncate=False)
)
+-----+----+--------+--------+----------------------------------------------------------------+
|type |name|tbl_name|rootpage|sql |
+-----+----+--------+--------+----------------------------------------------------------------+
|table|issn|issn |2 |CREATE TABLE issn(modified_issn INTEGER PRIMARY KEY, codes TEXT)|
|table|isbn|isbn |3 |CREATE TABLE isbn(modified_isbn INTEGER PRIMARY KEY, codes TEXT)|
+-----+----+--------+--------+----------------------------------------------------------------+
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='issn',url='jdbc:sqlite:sdxml.sqlite').load()
.show()
)
+--------------+-----+
| modified_issn|codes|
+--------------+-----+
|10000000000000| abc|
+--------------+-----+
(spark.read.format('jdbc')
.options(driver='org.sqlite.JDBC', dbtable='isbn',url='jdbc:sqlite:sdxml.sqlite').load()
.show()
)
+--------------+-----+
| modified_isbn|codes|
+--------------+-----+
|10000000000000| def|
+--------------+-----+
P.S.
customSchema
确实控制了模式,但是它不能防止源头上未知类型的错误。
我猜测 JDBC 驱动程序具有数据类型的映射 table,一旦找不到源的数据类型,它就会抛出异常。
driver = 'org.sqlite.JDBC'
query = 'select modified_issn from issn'
url = 'jdbc:sqlite:sdxml.sqlite'
data_types = ['tinyint','smallint','int','bigint','string']
for dt in data_types:
spark.read.format('jdbc').options(customSchema = f'modified_issn {dt}' ,driver=driver ,query=query ,url=url).load().printSchema()
root
|-- modified_issn: byte (nullable = true)
root
|-- modified_issn: short (nullable = true)
root
|-- modified_issn: integer (nullable = true)
root
|-- modified_issn: long (nullable = true)
root
|-- modified_issn: string (nullable = true)