pyspark:使用从 kafka 检索到的数据训练 kmeans 流
pyspark: train kmeans streaming with data retrieved from kafka
我想使用从 kafka 主题消耗的数据来训练流式 kmeans 模型。
我的问题是如何呈现 kmeans streamig 模型的数据
sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
lines.pprint()
此输出(这些是我用“|”分隔的特征):
1.0|2.0|0.0|21.0|2.0
1.0|2.0|0.0|21.0|2.0
那我想做这个
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
model.trainOn(lines)
如果我合并两段代码,我会得到错误:
TypeError: Cannot convert type <type 'unicode'> into Vector
第一个问题是格式化从 kafka 中提取的流。这是管道分隔数据的工作方式
sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
raw = kvs.flatMap(lambda kafkaS: [kafkaS])
lines = raw.map(lambda xs: xs[1].split("|"))
lines = lines.map(lambda x: DenseVector(x))
第二个问题是数据的维度:setRandomCenters
的第一个参数(它应该与特征的数量相同)
我想使用从 kafka 主题消耗的数据来训练流式 kmeans 模型。
我的问题是如何呈现 kmeans streamig 模型的数据
sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
lines.pprint()
此输出(这些是我用“|”分隔的特征):
1.0|2.0|0.0|21.0|2.0
1.0|2.0|0.0|21.0|2.0
那我想做这个
model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(3, 1.0, 0)
model.trainOn(lines)
如果我合并两段代码,我会得到错误:
TypeError: Cannot convert type <type 'unicode'> into Vector
第一个问题是格式化从 kafka 中提取的流。这是管道分隔数据的工作方式
sc = SparkContext(appName="PythonStreamingKafka")
ssc = StreamingContext(sc, 30)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
raw = kvs.flatMap(lambda kafkaS: [kafkaS])
lines = raw.map(lambda xs: xs[1].split("|"))
lines = lines.map(lambda x: DenseVector(x))
第二个问题是数据的维度:setRandomCenters
的第一个参数(它应该与特征的数量相同)