使用 Zeppelin Spark 2.0 和 Pyspark 连接到 AWS Redshift

Connecting to AWS Redshift with Zeppelin Spark 2.0 and Pyspark

我需要将 Redshift 数据读入 Zeppelin 中的数据帧。在过去的几个月里,我一直在 AWS 上通过 Zeppelin 使用 Spark 2.0 成功打开 csv 和 json S3 文件。

我曾经能够使用 Spark 1.6.2(也许是 1.6.1)从 AWS EMR 上的 Zeppelin 连接到 Redshift,使用以下代码:

%pyspark

from pyspark.sql import SQLContext, Row
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

#Load the data
aquery = "(SELECT serial_number, min(date_time) min_date_time from schema.table where serial_number in ('abcdefg','1234567') group by serial_number) as minDates"

dfMinDates = sqlContext.read.format('jdbc').options(url='jdbc:postgresql://dadadadaaaredshift.amazonaws.com:5439/idw?tcpKeepAlive=true&ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory?user=user&password=password', dbtable=aquery).load()
dfMinDates.show()

它奏效了。那是2016年的夏天。

从那时起我就不再需要它了,现在 AWS 有了 Spark 2.0。

新语法是

myDF = spark.read.jdbc 像这样:

%pyspark

aquery = "(SELECT serial_number, min(date_time) min_date_time from schema.table where serial_number in ('abcdefg','1234567') group by serial_number) as minDates"

dfMinDates = spark.read.jdbc("jdbc:postgresql://dadadadaaaredshift.amazonaws.com:5439/idw?tcpKeepAlive=true&ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory?user=user&password=password", dbtable=aquery).load()
dfMinDates.show()

但是我得到这个错误:

Py4JJavaError: An error occurred while calling o119.jdbc. : java.sql.SQLException: No suitable driver at java.sql.DriverManager.getDriver(DriverManager.java:315) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun.apply(JdbcUtils.scala:54) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun.apply(JdbcUtils.scala:54) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:53) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:123) at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.(JDBCRelation.scala:117) at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:237) at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:159) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:280) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:128) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:211) at java.lang.Thread.run(Thread.java:745) (, Py4JJavaError(u'An error occurred while calling o119.jdbc.\n', JavaObject id=o121), )

我研究了 Spark 2.0 文档,发现了这个:

The JDBC driver class must be visible to the primordial class loader on the client session and on all executors. This is because Java’s DriverManager class does a security check that results in it ignoring all drivers not visible to the primordial class loader when one goes to open a connection. One convenient way to do this is to modify compute_classpath.sh on all worker nodes to include your driver JARs.

我不知道如何实现这一点,并从各种帖子、一些博客和 Whosebug 中的一些帖子中进行了更多阅读,发现了这个:

spark.driver.extraClassPath = org.postgresql.Driver

我在 Zeppelin 的 Interpreter 设置页面做了这个,但我仍然得到同样的错误。

我尝试添加一个 Postgres 解释器,但我不确定我做对了(因为我不确定是把它放在 Spark 解释器中还是 Python 解释器),我选择了星火解释器。现在 Postgres 解释器也具有与 Spark 解释器相同的所有设置,这可能无关紧要,但我仍然得到相同的错误。

在 Spark 1.6 中,我只是不记得经历过所有这些麻烦。

作为一项实验,我使用 Spark 1.6.2 启动了一个 EMR 集群,并尝试了曾经有效的旧代码,但得到了与上述相同的错误!

Zeppelin 站点涵盖了 Postgres,但它们的信息看起来像代码而不是如何设置解释器,所以我不知道如何使用它。

我没有想法和参考。

非常感谢任何建议!

您需要使用 Amazon 的 Redshift 特定驱动程序。您可以从这里下载:http://docs.aws.amazon.com/redshift/latest/mgmt/configure-jdbc-connection.html.

但是,如果您使用的是 EMR,它已经就位(在 /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar),您只需告诉 Zeppelin 它在哪里。

声明方法如下: