在 RabbitMq 中通过 RPC 获取 Pyspark 通信中的错误
GEtting Error in Pyspark communication through RPC in RabbitMq
我最近使用 RabbitMq 通过 RPC.For 测试目的从远程 pc 进行 pyspark 通信我开发了一个测试代码,它给我错误
我已按照 RabbitMq 文档教程在 pyspark 上实施 RPC
这是我的 spark RPC 服务器代码
import pika
from tkinter import*
from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
import json
import re
connectionparam=pika.ConnectionParameters(host="localhost")
connection=pika.BlockingConnection(connectionparam)
channel=connection.channel()
channel.queue_declare(queue='rpc_queue')
spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\
\
.appName("TestApp").\
enableHiveSupport().getOrCreate()
print("success")
#establishhing chraracter
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID"
def queryBuilder(sqlval):
print("printing",sqlval)
df=spark.sql(sqlval)
print("printing data frame table")
df.show()
resultlist = df.toJSON().collect()
dumpdata = re.sub(r"\'", "", str(resultlist))
jsondata = json.dumps(dumpdata)
#print(jsondata)
return jsondata
def on_request(ch,method,props, body):
n=body
print("printing request body ",n)
response=queryBuilder(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=response
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print("[x] Awaiting RPC Request")
channel.start_consuming()
master=Tk()
entryval=Entry(master)
entryval.grid(row=0,column=1)
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50)
mainloop()
我的远程 pyspark 应用程序的以下 RPC 客户端代码是
import pika
import uuid
class SparkRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, querymsg):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=querymsg)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
sparkrpc = SparkRpcClient()
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID"
print(" [x] Requesting query")
response = sparkrpc.call(sqlstring)
print(" [.] Got %s" % response)
我的服务器已经收到来自客户端的请求字符串并打印它,但它无法处理我的 querybuild() 函数,该函数处理 sqlstring 和 return json 数据。更重要的是,我已经请求了多次,而且似乎那个单独的请求已经在 rpc 队列中排队但没有被清除 out.Because 如果我 运行 只有服务器脚本我得到同样的错误。可能是我在这里遗漏了一些东西,任何人都可以帮我弄清楚。我只想 return json 数据给客户端
提前致谢
卡莲
您传递的类型不兼容(看起来像 bytes
或 bytearray
),而应为 str
。
您应该decode
首先将内容串起来。
def queryBuilder(sqlval, enc):
...
df = spark.sql(sqlval.decode(enc))
df.show()
...
我最近使用 RabbitMq 通过 RPC.For 测试目的从远程 pc 进行 pyspark 通信我开发了一个测试代码,它给我错误
我已按照 RabbitMq 文档教程在 pyspark 上实施 RPC
这是我的 spark RPC 服务器代码
import pika
from tkinter import*
from pyspark.sql import SparkSession
from pyspark import SparkConf,SparkContext
import json
import re
connectionparam=pika.ConnectionParameters(host="localhost")
connection=pika.BlockingConnection(connectionparam)
channel=connection.channel()
channel.queue_declare(queue='rpc_queue')
spark=SparkSession.builder.config("spark.sql.warehouse.dir", "C:\spark\spark-warehouse")\
\
.appName("TestApp").\
enableHiveSupport().getOrCreate()
print("success")
#establishhing chraracter
#sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID"
def queryBuilder(sqlval):
print("printing",sqlval)
df=spark.sql(sqlval)
print("printing data frame table")
df.show()
resultlist = df.toJSON().collect()
dumpdata = re.sub(r"\'", "", str(resultlist))
jsondata = json.dumps(dumpdata)
#print(jsondata)
return jsondata
def on_request(ch,method,props, body):
n=body
print("printing request body ",n)
response=queryBuilder(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=response
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,queue='rpc_queue')
print("[x] Awaiting RPC Request")
channel.start_consuming()
master=Tk()
entryval=Entry(master)
entryval.grid(row=0,column=1)
Button(master,text='Quit',command=master.quit).grid(row=3,column=1,sticky=W,pady=50)
mainloop()
我的远程 pyspark 应用程序的以下 RPC 客户端代码是
import pika
import uuid
class SparkRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True,
queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, querymsg):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = self.callback_queue,
correlation_id = self.corr_id,
),
body=querymsg)
while self.response is None:
self.connection.process_data_events()
return int(self.response)
sparkrpc = SparkRpcClient()
sqlstring="SELECT lflow1.LeaseType as LeaseType, lflow1.Status as Status, lflow1.Property as property, lflow1.City as City, lesflow2.DealType as DealType, lesflow2.Area as Area, lflow1.Did as DID, lesflow2.MID as MID from lflow1, lesflow2 WHERE lflow1.Did = lesflow2.MID"
print(" [x] Requesting query")
response = sparkrpc.call(sqlstring)
print(" [.] Got %s" % response)
我的服务器已经收到来自客户端的请求字符串并打印它,但它无法处理我的 querybuild() 函数,该函数处理 sqlstring 和 return json 数据。更重要的是,我已经请求了多次,而且似乎那个单独的请求已经在 rpc 队列中排队但没有被清除 out.Because 如果我 运行 只有服务器脚本我得到同样的错误。可能是我在这里遗漏了一些东西,任何人都可以帮我弄清楚。我只想 return json 数据给客户端 提前致谢 卡莲
您传递的类型不兼容(看起来像 bytes
或 bytearray
),而应为 str
。
您应该decode
首先将内容串起来。
def queryBuilder(sqlval, enc):
...
df = spark.sql(sqlval.decode(enc))
df.show()
...