如何使用 Spark Dstreams 进行简单的随机采样?(pyspark 使用 spark 1.6.1)
How to do simple random sampling with Spark Dstreams?(pyspark using spark 1.6.1)
我想从 Dstream 中的 rdds 中提取样本。由于 Dstream 没有 sample()
转换,它是一个 rdds 序列,所以我这样做是为了从 Dstream 中获取样本并对其应用字数统计:
from pyspark import SparkContext
from pyspark import SparkConf
# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("SRS")
sc = SparkContext('local[3]', conf=conf)
from pyspark.streaming import StreamingContext
streamContext = StreamingContext(sc,3)
lines = streamContext.socketTextStream("localhost", 9000)
def sampleWord(rdd):
return rdd.sample(false,0.5,10)
lineSample = lines.foreachRDD(sampleWord)
words = lineSample.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word , 1))
wordCount = pairs.reduceByKey(lambda x, y: x + y)
wordCount.pprint(60)
streamContext.start()
streamContext.stop()
使用此代码,Spark 启动但没有任何反应。我不知道为什么 rdd.sample()
这样不行?使用 foreachRDD
,我们可以访问流中的每个 rdd,所以我认为现在我们可以使用特定于 rdd 的转换。
使用transform
:
lineSample = lines.transform(sampleWord)
使用 transform
而不是 foreachRDD
。另外,您的代码中有错别字。
def sampleWord(rdd):
return rdd.sample(False,0.5,10) //False, not false
lineSample = lines.transform(sampleWord)
words = lineSample.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word , 1))
wordCount = pairs.reduceByKey(lambda x, y: x + y)
wordCount.pprint(60)
我想从 Dstream 中的 rdds 中提取样本。由于 Dstream 没有 sample()
转换,它是一个 rdds 序列,所以我这样做是为了从 Dstream 中获取样本并对其应用字数统计:
from pyspark import SparkContext
from pyspark import SparkConf
# Optionally configure Spark Settings
conf=SparkConf()
conf.set("spark.executor.memory", "1g")
conf.set("spark.cores.max", "2")
conf.setAppName("SRS")
sc = SparkContext('local[3]', conf=conf)
from pyspark.streaming import StreamingContext
streamContext = StreamingContext(sc,3)
lines = streamContext.socketTextStream("localhost", 9000)
def sampleWord(rdd):
return rdd.sample(false,0.5,10)
lineSample = lines.foreachRDD(sampleWord)
words = lineSample.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word , 1))
wordCount = pairs.reduceByKey(lambda x, y: x + y)
wordCount.pprint(60)
streamContext.start()
streamContext.stop()
使用此代码,Spark 启动但没有任何反应。我不知道为什么 rdd.sample()
这样不行?使用 foreachRDD
,我们可以访问流中的每个 rdd,所以我认为现在我们可以使用特定于 rdd 的转换。
使用transform
:
lineSample = lines.transform(sampleWord)
使用 transform
而不是 foreachRDD
。另外,您的代码中有错别字。
def sampleWord(rdd):
return rdd.sample(False,0.5,10) //False, not false
lineSample = lines.transform(sampleWord)
words = lineSample.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word , 1))
wordCount = pairs.reduceByKey(lambda x, y: x + y)
wordCount.pprint(60)