pyspark:ml + 流媒体
pyspark : ml + streaming
根据 可以对 spark 中的输入流进行预测。
给定示例(适用于我的集群)的问题是 testData 是正确格式的给定权利。
我正在尝试基于数据字符串设置客户端 <-> 服务器 tcp 交换。
我不知道如何将字符串转换为正确的格式。
虽然这有效:
sep = ";"
str_recue = '0.0;0.1;0.2;0.3;0.4;0.5'
rdd = sc.parallelize([str_recue])
chemin = "hdfs://xx.xx.xx.xx:8020/cart_model_for_cycliste_v2"
model = DecisionTreeClassificationModel.load(chemin)
# travail sur la string
rdd2 = rdd.map( lambda data : data.split(sep))
rdd3 = rdd2.map(lambda tableau: [float(x) for x in tableau])
# création df
cols = ["c1", "c2", "c3", "c4", "c5", "c6"]
fields = [StructField(x, FloatType(), True) for x in cols]
schema = StructType(fields)
df = spark.createDataFrame(rdd3, schema=schema )
# preparation d'une colonne de features
schema = StructType(fields)
assembler = VectorAssembler()
assembler = assembler.setInputCols(cols)
assembler = assembler.setOutputCol("features")
df2 = assembler.transform(df)
model.transform(df2).show()
捐赠:
+---+---+---+---+---+---+--------------------+-------------+-----------+----------+
| c1| c2| c3| c4| c5| c6| features|rawPrediction|probability|prediction|
+---+---+---+---+---+---+--------------------+-------------+-----------+----------+
|0.0|0.1|0.2|0.3|0.4|0.5|[0.0,0.1000000014...| [0.0,3426.0]| [0.0,1.0]| 1.0|
+---+---+---+---+---+---+--------------------+-------------+-----------+----------+
我不知道如何让它在监听套接字时工作。
我有我的服务器:
import socket
import random
import time
port = 12003
ip = socket.gethostname()
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind((ip, port))
serversocket.listen(1)
(clientsocket, address) = serversocket.accept()
nb_d_envois = 10
tps_attente = 3
for i in range(nb_d_envois):
time.sleep(tps_attente)
sep = ";"
to_send = '0.0;0.1;0.2;0.3;0.4;0.5'
print(to_send)
clientsocket.send(to_send.encode())
向我的 spark Streaming 上下文发送一个字符串。
接下来做什么 ?这是我的问题。根据:https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py 应该可以做一个 [foreach]
所以我创建了一个函数:
def prevoir(time, rdd):
sep = ";"
chemin = "hdfs://54.37.12.49:8020/cart_model_for_cycliste_v2"
model = DecisionTreeClassificationModel.load(chemin)
# travail sur la string
rdd2 = rdd.map( lambda data : data.split(sep))
rdd3 = rdd2.map(lambda tableau: [float(x) for x in tableau])
# création df
cols = ["c1", "c2", "c3", "c4", "c5", "c6"]
fields = [StructField(x, FloatType(), True) for x in cols]
schema = StructType(fields)
df = spark.createDataFrame(rdd3, schema=schema )
# preparation d'une colonne de features
schema = StructType(fields)
assembler = VectorAssembler()
assembler = assembler.setInputCols(cols)
assembler = assembler.setOutputCol("features")
df2 = assembler.transform(df)
model.transform(df2).show()
并将其应用于流式上下文:
ssc = StreamingContext(sc, 5)
dstream = ssc.socketTextStream(listen_to_ip, listen_to_port)
dstream.foreachRDD(prevoir)
但什么也没有出现(甚至没有正常时间信息)。
也没有错误。
我的疑惑是:
该函数未注册为 UDF,所以我怀疑它根本无法调用
通过hdfs加载模型当然应该只做一次并作为参数传递
"show" 函数在我看来并不是真正的分布式(但它在未应用于 'foreachrdd' 时有效...=> 也许我应该在 hdfs 上保存一些东西?
欢迎任何帮助...
数据没有从服务器发送到流上下文。代码正确。
根据
给定示例(适用于我的集群)的问题是 testData 是正确格式的给定权利。
我正在尝试基于数据字符串设置客户端 <-> 服务器 tcp 交换。 我不知道如何将字符串转换为正确的格式。
虽然这有效:
sep = ";"
str_recue = '0.0;0.1;0.2;0.3;0.4;0.5'
rdd = sc.parallelize([str_recue])
chemin = "hdfs://xx.xx.xx.xx:8020/cart_model_for_cycliste_v2"
model = DecisionTreeClassificationModel.load(chemin)
# travail sur la string
rdd2 = rdd.map( lambda data : data.split(sep))
rdd3 = rdd2.map(lambda tableau: [float(x) for x in tableau])
# création df
cols = ["c1", "c2", "c3", "c4", "c5", "c6"]
fields = [StructField(x, FloatType(), True) for x in cols]
schema = StructType(fields)
df = spark.createDataFrame(rdd3, schema=schema )
# preparation d'une colonne de features
schema = StructType(fields)
assembler = VectorAssembler()
assembler = assembler.setInputCols(cols)
assembler = assembler.setOutputCol("features")
df2 = assembler.transform(df)
model.transform(df2).show()
捐赠:
+---+---+---+---+---+---+--------------------+-------------+-----------+----------+
| c1| c2| c3| c4| c5| c6| features|rawPrediction|probability|prediction|
+---+---+---+---+---+---+--------------------+-------------+-----------+----------+
|0.0|0.1|0.2|0.3|0.4|0.5|[0.0,0.1000000014...| [0.0,3426.0]| [0.0,1.0]| 1.0|
+---+---+---+---+---+---+--------------------+-------------+-----------+----------+
我不知道如何让它在监听套接字时工作。
我有我的服务器:
import socket
import random
import time
port = 12003
ip = socket.gethostname()
serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.bind((ip, port))
serversocket.listen(1)
(clientsocket, address) = serversocket.accept()
nb_d_envois = 10
tps_attente = 3
for i in range(nb_d_envois):
time.sleep(tps_attente)
sep = ";"
to_send = '0.0;0.1;0.2;0.3;0.4;0.5'
print(to_send)
clientsocket.send(to_send.encode())
向我的 spark Streaming 上下文发送一个字符串。 接下来做什么 ?这是我的问题。根据:https://github.com/apache/spark/blob/master/examples/src/main/python/streaming/sql_network_wordcount.py 应该可以做一个 [foreach]
所以我创建了一个函数:
def prevoir(time, rdd):
sep = ";"
chemin = "hdfs://54.37.12.49:8020/cart_model_for_cycliste_v2"
model = DecisionTreeClassificationModel.load(chemin)
# travail sur la string
rdd2 = rdd.map( lambda data : data.split(sep))
rdd3 = rdd2.map(lambda tableau: [float(x) for x in tableau])
# création df
cols = ["c1", "c2", "c3", "c4", "c5", "c6"]
fields = [StructField(x, FloatType(), True) for x in cols]
schema = StructType(fields)
df = spark.createDataFrame(rdd3, schema=schema )
# preparation d'une colonne de features
schema = StructType(fields)
assembler = VectorAssembler()
assembler = assembler.setInputCols(cols)
assembler = assembler.setOutputCol("features")
df2 = assembler.transform(df)
model.transform(df2).show()
并将其应用于流式上下文:
ssc = StreamingContext(sc, 5)
dstream = ssc.socketTextStream(listen_to_ip, listen_to_port)
dstream.foreachRDD(prevoir)
但什么也没有出现(甚至没有正常时间信息)。 也没有错误。
我的疑惑是:
该函数未注册为 UDF,所以我怀疑它根本无法调用
通过hdfs加载模型当然应该只做一次并作为参数传递
"show" 函数在我看来并不是真正的分布式(但它在未应用于 'foreachrdd' 时有效...=> 也许我应该在 hdfs 上保存一些东西?
欢迎任何帮助...
数据没有从服务器发送到流上下文。代码正确。