如何查看 Thrift 服务器上的 pyspark 临时表?

How to view pyspark temporary tables on Thrift server?

我正在尝试通过 Thrift 在 pyspark 上创建一个临时的 table。我的最终目标是能够使用 JDBC.

从像 DBeaver 这样的数据库客户端访问它

我首先使用直线进行测试。

这就是我正在做的。

  1. 使用 docker 在我自己的机器上启动了一个由一名工人组成的集群,并在 spark-defaults.conf
  2. 上添加了 spark.sql.hive.thriftServer.singleSession true
  3. 启动 Pyspark shell(为了测试)和 运行 以下代码:

    from pyspark.sql import Row l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)] rdd = sc.parallelize(l) people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) people = people.toDF().cache() peebs = people.createOrReplaceTempView('peebs') result = sqlContext.sql('select * from peebs')

    到目前为止还不错,一切正常。

  4. 在不同的终端上我初始化了 spark thrift 服务器: ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --conf spark.executor.cores=1 --master spark://172.18.0.2:7077

    服务器似乎正常启动,我能够在我的 spark 集群主机 UI.

  5. 上看到 pyspark 和 thrift 服务器作业 运行
  6. 然后我使用直线连接到集群

    ./bin/beeline beeline> !connect jdbc:hive2://172.18.0.2:10001

    这就是我得到的

    Connecting to jdbc:hive2://172.18.0.2:10001
    Enter username for jdbc:hive2://172.18.0.2:10001:
    Enter password for jdbc:hive2://172.18.0.2:10001:
    2019-06-29 20:14:25 INFO Utils:310 - Supplied authorities: 172.18.0.2:10001
    2019-06-29 20:14:25 INFO Utils:397 - Resolved authority: 172.18.0.2:10001
    2019-06-29 20:14:25 INFO HiveConnection:203 - Will try to open client transport with JDBC Uri: jdbc:hive2://172.18.0.2:10001
    Connected to: Spark SQL (version 2.3.3)
    Driver: Hive JDBC (version 1.2.1.spark2)
    Transaction isolation: TRANSACTION_REPEATABLE_READ

    好像没问题。

  7. 当我列出 show tables; 时,我什么也看不到。

我想强调的两件有趣的事情是:

  1. 当我启动 pyspark 时,我收到这些警告

    WARN ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0

    WARN ObjectStore:568 - Failed to get database default, returning NoSuchObjectException

    WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException

  2. 当我启动 thrift 服务器时,我得到这些:

    rsync from spark://172.18.0.2:7077
    ssh: Could not resolve hostname spark: Name or service not known
    rsync: connection unexpectedly closed (0 bytes received so far) [Receiver]
    rsync error: unexplained error (code 255) at io.c(235) [Receiver=3.1.2]
    starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to ...

我浏览过多个帖子和讨论。我看到有人说我们不能通过 thrift 公开临时 tables 除非你从相同的代码中启动服务器。如果那是真的,我怎么能在 python (pyspark) 中做到这一点?

谢谢

createOrReplaceTempView 创建一个 in-memory table。 Spark thrift 服务器需要在我们创建 in-memory table.
的同一驱动程序 JVM 上启动 在上面的示例中,创建 table 的驱动程序与驱动程序 运行 STS(Spark Thrift server)不同。
两个选项
1. 在启动 STS 的同一 JVM 中使用 createOrReplaceTempView 创建 table。
2. 使用后备元存储,并使用 org.apache.spark.sql.DataFrameWriter#saveAsTable 创建 tables 以便 tables 可以独立于 JVM 访问(实际上没有任何 Spark 驱动程序。

关于错误:
1.与客户端和服务端metastore版本有关。
2. 似乎有些 rsync 脚本试图解码 spark:\ url
两者似乎都与问题无关。

在做了几次测试后,我能够想出一个简单的(无身份验证)代码,对我有用。

重要的是要注意,如果您想通过 JDBC 使临时表可用,您需要在同一个 JVM(同一个 spark 作业)中启动 thrift 服务器,并确保代码挂起,以便应用程序得以保留 运行 在集群中。

遵循我创建的工作示例代码以供参考:

import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import

spark = SparkSession \
    .builder \
    .appName('the_test') \
    .enableHiveSupport()\
    .config('spark.sql.hive.thriftServer.singleSession', True)\
    .config('hive.server2.thrift.port', '10001') \
    .getOrCreate()

sc=spark.sparkContext
sc.setLogLevel('INFO')

java_import(sc._gateway.jvm, "")


from pyspark.sql import Row
l = [('John', 20), ('Heather', 34), ('Sam', 23), ('Danny', 36)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')

sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)

while True:
    time.sleep(10)

我只是在 spark-submit 中使用了上面的 .py,并且我能够通过 JDBC 通过直线连接并使用 DBeaver 使用 Hive JDBC 驱动程序。

万一有人需要在 Spark Streaming 中执行此操作,我让它可以像这样工作。

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import

spark = SparkSession \
    .builder \
    .appName('restlogs_qlik') \
    .enableHiveSupport()\
    .config('spark.sql.hive.thriftServer.singleSession', True)\
    .config('hive.server2.thrift.port', '10001') \
    .getOrCreate()

sc=spark.sparkContext
sc.setLogLevel('INFO')

#Order matters! 
java_import(sc._gateway.jvm, "")
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)

#Define schema of json
schema = StructType().add("partnerid", "string").add("sessionid", "string").add("functionname", "string").add("functionreturnstatus", "string")

#load data into spark-structured streaming
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "rest_logs") \
      .load() \
      .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

#Print output
query = df.writeStream \
            .outputMode("append") \
            .format("memory") \
            .queryName("view_figures") \
            .start()

query.awaitTermination();

启动后,您可以JDBC使用蜂箱进行测试。我无法理解的是我必须在同一个脚本中启动 Thrift 服务器。这是启动脚本的方法。

    spark-submit --master local[2] \
--conf "spark.driver.extraClassPath=D:\Libraries\m2_repository\org\apache\kafka\kafka-clients\2.0.0\kafka-clients-2.0.0.jar" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 \
"C:\Temp\spark_kafka.py"

希望这对某人有所帮助。顺便说一句,我处于初步研究阶段,所以不要评判我。