Pyspark JDBC Hana 数据库 dataframe.show 错误

Pyspark JDBC Hana database dataframe.show error

我能够通过使用 python JDBC 数据帧连接到 spark 中的 Hana 数据库,并且能够获得 dataframe.printSchema() 输出,但是如果我尝试执行 [=19 这样的操作=]() 抛出连接不可序列化之类的错误我们如何在 pyspark 中使连接可序列化 下面是使用的代码

from pyspark.sql import SQLContext

from pyspark import SparkContext

sc = SparkContext(appName="hdfspush")

sqlctx = SQLContext(sc)

df = sqlctx.read.format('jdbc').options(driver='com.sap.db.jdbc.Driver',url=urlname,dbtable='abcd').load()

df.printSchema()

df.show()

下面是错误信息

Traceback (most recent call last):
  File "C:/spark-1.4.1-bin-hadoop2.6/bin/testhana4.py", line 15, in <module>
    df.show()
  File "C:\spark-1.4.1-bin-hadoop2.6\python\lib\pyspark.zip\pyspark\sql\dataframe.py", line 258, in show
  File "C:\spark-1.4.1-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
  File "C:\spark-1.4.1-bin-hadoop2.6\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o25.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.sap.db.jdbc.topology.Host
Serialization stack:
        - object not serializable (class: com.sap.db.jdbc.topology.Host, value: saphdbdev03.isus.emc.com:30415)
        - writeObject data (class: java.util.ArrayList)
        - object (class java.util.ArrayList, [saphdbdev03:30415])
        - writeObject data (class: java.util.Hashtable)
        - object (class java.util.Properties, {dburl=jdbc:sap://saphdbdev03:30415, user=SAPSR3, password=*****, url=jdbc:sap://saphdbdev03.isus.emc.com:30415?user=SAPSR3&password=******,
        - field (class: org.apache.spark.sql.jdbc.JDBCRDD$$anonfun$getConnector, name: properties, type: class java.util.Properties)
        - object (class org.apache.spark.sql.jdbc.JDBCRDD$$anonfun$getConnector, <function0>)
        - field (class: org.apache.spark.sql.jdbc.JDBCRDD, name: org$apache$spark$sql$jdbc$JDBCRDD$$getConnection, type: interface scala.Function0)
        - object (class org.apache.spark.sql.jdbc.JDBCRDD, JDBCRDD[0] at showString at NativeMethodAccessorImpl.java:-2)
        - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.OneToOneDependency, org.apache.spark.OneToOneDependency@2e079958)
        - writeObject data (class: scala.collection.immutable.$colon$colon)
        - object (class scala.collection.immutable.$colon$colon, List(org.apache.spark.OneToOneDependency@2e079958))
        - field (class: org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at showString at NativeMethodAccessorImpl.java:-2)
        - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
        - object (class scala.Tuple2, (MapPartitionsRDD[1] at showString at NativeMethodAccessorImpl.java:-2,<function2>))
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1264)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1263)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:878)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815)
        at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1426)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
        at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)

15/12/11 14:21:34 INFO spark.SparkContext: Invoking stop() from shutdown hook
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
15/12/11 14:21:34 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
15/12/11 14:21:35 INFO ui.SparkUI: Stopped Spark web UI at http://10.30.117.16:4040
15/12/11 14:21:35 INFO scheduler.DAGScheduler: Stopping DAGScheduler

这是通过编写新的 Scala 程序来处理连接属性中的序列化来解决的