PySpark:如何将 MySQL 函数与 JDBC 连接一起使用?
PySpark: how to use a MySQL function with JDBC connection?
我在 MySQL 数据库中使用这个查询
select *,
UNIX_TIMESTAMP(CONVERT_TZ(
SUBSTRING(input_date, 1, 19),
SUBSTRING(input_date, 20),
'SYSTEM'
)) as timestamp
from my_table
用于将具有 UTC 偏移量(例如 2018-12-15T13:48:16-08:00
)的示例 input_date
时间戳转换为纪元时间。
我现在需要使用 PySpark 做同样的事情并通过 JDBC 连接访问那个 table,但是当我尝试时我得到以下错误
Py4JJavaError: An error occurred while calling o62.sql.
: org.apache.spark.sql.AnalysisException: Undefined function: 'CONVERT_TZ'. This function is neither a registered temporary function nor a permanent function registered in the database ...
我做错了什么?在 PySpark 中有更好的方法吗?
谢谢
您可以使用此函数连接到 MySQL 数据库:
def connect_to_sql(
spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):
jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)
connection_details = {
"user": username,
"password": password,
"driver": "com.mysql.cj.jdbc.Driver",
}
df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
return df
关于时区转换,这个问题会对你有所帮助:
我在 MySQL 数据库中使用这个查询
select *,
UNIX_TIMESTAMP(CONVERT_TZ(
SUBSTRING(input_date, 1, 19),
SUBSTRING(input_date, 20),
'SYSTEM'
)) as timestamp
from my_table
用于将具有 UTC 偏移量(例如 2018-12-15T13:48:16-08:00
)的示例 input_date
时间戳转换为纪元时间。
我现在需要使用 PySpark 做同样的事情并通过 JDBC 连接访问那个 table,但是当我尝试时我得到以下错误
Py4JJavaError: An error occurred while calling o62.sql.
: org.apache.spark.sql.AnalysisException: Undefined function: 'CONVERT_TZ'. This function is neither a registered temporary function nor a permanent function registered in the database ...
我做错了什么?在 PySpark 中有更好的方法吗?
谢谢
您可以使用此函数连接到 MySQL 数据库:
def connect_to_sql(
spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):
jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)
connection_details = {
"user": username,
"password": password,
"driver": "com.mysql.cj.jdbc.Driver",
}
df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
return df
关于时区转换,这个问题会对你有所帮助: