在 RabbitMq 中通过 RPC 获取 Pyspark 通信中的错误

GEtting Error in Pyspark communication through RPC in RabbitMq

我最近使用 RabbitMq 通过 RPC.For 测试目的从远程 pc 进行 pyspark 通信我开发了一个测试代码,它给我错误

我已按照 RabbitMq 文档教程在 pyspark 上实施 RPC

这是我的 spark RPC 服务器代码

import pika
from tkinter import*
from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
import json
import re



connectionparam=pika.ConnectionParameters(host="localhost")
connection=pika.BlockingConnection(connectionparam)

channel=connection.channel()

channel.queue_declare(queue='rpc_queue')







spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\
    \
    .appName("TestApp").\
    enableHiveSupport().getOrCreate()

print("success")
#establishhing chraracter
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2  WHERE lflow1.Did = lesflow2.MID"



def queryBuilder(sqlval):
    print("printing",sqlval)
    df=spark.sql(sqlval)
    print("printing data frame table")
    df.show()

    resultlist = df.toJSON().collect()
    dumpdata = re.sub(r"\'", "", str(resultlist))
    jsondata = json.dumps(dumpdata)
    #print(jsondata)
    return jsondata


def on_request(ch,method,props, body):
    n=body
    print("printing request body ",n)
    response=queryBuilder(n)
    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,
                     properties=pika.BasicProperties(correlation_id=props.correlation_id),
                     body=response
                     )
    ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print("[x] Awaiting RPC Request")

channel.start_consuming()

master=Tk()
entryval=Entry(master)
entryval.grid(row=0,column=1)
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50)
mainloop()

我的远程 pyspark 应用程序的以下 RPC 客户端代码是

import pika
import uuid

class SparkRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, no_ack=True,
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:
            self.response = body

    def call(self, querymsg):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                         reply_to = self.callback_queue,
                                         correlation_id = self.corr_id,
                                         ),
                                   body=querymsg)
        while self.response is None:
            self.connection.process_data_events()
        return int(self.response)

sparkrpc = SparkRpcClient()
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2  WHERE lflow1.Did = lesflow2.MID"


print(" [x] Requesting query")
response = sparkrpc.call(sqlstring)
print(" [.] Got %s" % response)

我的服务器已经收到来自客户端的请求字符串并打印它,但它无法处理我的 querybuild() 函数,该函数处理 sqlstring 和 return json 数据。更重要的是,我已经请求了多次,而且似乎那个单独的请求已经在 rpc 队列中排队但没有被清除 out.Because 如果我 运行 只有服务器脚本我得到同样的错误。可能是我在这里遗漏了一些东西,任何人都可以帮我弄清楚。我只想 return json 数据给客户端 提前致谢 卡莲

您传递的类型不兼容(看起来像 bytesbytearray),而应为 str

您应该decode首先将内容串起来。

def queryBuilder(sqlval, enc):
    ...
    df = spark.sql(sqlval.decode(enc))
    df.show()
    ...