Spark - 按键排序 DStream 并限制为 5 个值
Spark - Sort DStream by Key and limit to 5 values
我已经开始学习 spark
并且我写了一个 pyspark
流式程序来从端口 3333
.
读取股票数据(代码,交易量)
示例数据在 3333
流式传输
"AAC",111113
"ABT",7451020
"ABBV",7325429
"ADPT",318617
"AET",1839122
"ALR",372777
"AGN",4170581
"ABC",3001798
"ANTM",1968246
我想显示基于 volume
的前 5 个符号。所以我使用映射器读取每一行,然后将其拆分为 comma
并反转。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 3333)
stocks = lines.map(lambda line: sorted(line.split(','), reverse=True))
stocks.pprint()
以下是stocks.pprint()
的输出
[u'111113', u'"AAC"']
[u'7451020', u'"ABT"']
[u'7325429', u'"ABBV"']
[u'318617', u'"ADPT"']
[u'1839122', u'"AET"']
[u'372777', u'"ALR"']
[u'4170581', u'"AGN"']
[u'3001798', u'"ABC"']
[u'1968246', u'"ANTM"']
我想到了以下函数来显示股票代码,但不确定如何按键对股票进行排序 (volume
),然后将函数限制为仅显示前 5 个值。
stocks.foreachRDD(processStocks)
def processStocks(stock):
for st in stock.collect():
print st[1]
由于流表示无限序列,您所能做的就是对每个批次进行排序。首先,您必须正确解析数据:
lines = ssc.queueStream([sc.parallelize([
"AAC,111113", "ABT,7451020", "ABBV,7325429","ADPT,318617",
"AET,1839122", "ALR,372777", "AGN,4170581", "ABC,3001798",
"ANTM,1968246"
])])
def parse(line):
try:
k, v = line.split(",")
yield (k, int(v))
except ValueError:
pass
parsed = lines.flatMap(parse)
接下来,对每个批次进行排序:
sorted_ = parsed.transform(
lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
最后,你可以pprint
顶层元素:
sorted_.pprint(5)
如果一切顺利,您应该得到如下输出:
-------------------------------------------
Time: 2016-10-02 14:52:30
-------------------------------------------
('ABT', 7451020)
('ABBV', 7325429)
('AGN', 4170581)
('ABC', 3001798)
('ANTM', 1968246)
...
根据批次的大小,完全排序可能会非常昂贵。在这种情况下,您可以使用 top
和 parallelize
:
sorted_ = parsed.transform(lambda rdd: rdd.ctx.parallelize(rdd.top(5)))
甚至 reduceByKey
:
from operator import itemgetter
import heapq
key = itemgetter(1)
def create_combiner(key=lambda x: x):
def _(x):
return [(key(x), x)]
return _
def merge_value(n=5, key=lambda x: x):
def _(acc, x):
heapq.heappush(acc, (key(x), x))
return heapq.nlargest(n, acc) if len(acc) > n else acc
return _
def merge_combiners(n=5):
def _(acc1, acc2):
merged = list(heapq.merge(acc1, acc2))
return heapq.nlargest(n, merged) if len(merged) > n else merged
return _
(parsed
.map(lambda x: (None, x))
.combineByKey(
create_combiner(key=key), merge_value(key=key), merge_combiners())
.flatMap(lambda x: x[1]))
我已经开始学习 spark
并且我写了一个 pyspark
流式程序来从端口 3333
.
示例数据在 3333
"AAC",111113
"ABT",7451020
"ABBV",7325429
"ADPT",318617
"AET",1839122
"ALR",372777
"AGN",4170581
"ABC",3001798
"ANTM",1968246
我想显示基于 volume
的前 5 个符号。所以我使用映射器读取每一行,然后将其拆分为 comma
并反转。
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream("localhost", 3333)
stocks = lines.map(lambda line: sorted(line.split(','), reverse=True))
stocks.pprint()
以下是stocks.pprint()
[u'111113', u'"AAC"']
[u'7451020', u'"ABT"']
[u'7325429', u'"ABBV"']
[u'318617', u'"ADPT"']
[u'1839122', u'"AET"']
[u'372777', u'"ALR"']
[u'4170581', u'"AGN"']
[u'3001798', u'"ABC"']
[u'1968246', u'"ANTM"']
我想到了以下函数来显示股票代码,但不确定如何按键对股票进行排序 (volume
),然后将函数限制为仅显示前 5 个值。
stocks.foreachRDD(processStocks)
def processStocks(stock):
for st in stock.collect():
print st[1]
由于流表示无限序列,您所能做的就是对每个批次进行排序。首先,您必须正确解析数据:
lines = ssc.queueStream([sc.parallelize([
"AAC,111113", "ABT,7451020", "ABBV,7325429","ADPT,318617",
"AET,1839122", "ALR,372777", "AGN,4170581", "ABC,3001798",
"ANTM,1968246"
])])
def parse(line):
try:
k, v = line.split(",")
yield (k, int(v))
except ValueError:
pass
parsed = lines.flatMap(parse)
接下来,对每个批次进行排序:
sorted_ = parsed.transform(
lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
最后,你可以pprint
顶层元素:
sorted_.pprint(5)
如果一切顺利,您应该得到如下输出:
-------------------------------------------
Time: 2016-10-02 14:52:30
-------------------------------------------
('ABT', 7451020)
('ABBV', 7325429)
('AGN', 4170581)
('ABC', 3001798)
('ANTM', 1968246)
...
根据批次的大小,完全排序可能会非常昂贵。在这种情况下,您可以使用 top
和 parallelize
:
sorted_ = parsed.transform(lambda rdd: rdd.ctx.parallelize(rdd.top(5)))
甚至 reduceByKey
:
from operator import itemgetter
import heapq
key = itemgetter(1)
def create_combiner(key=lambda x: x):
def _(x):
return [(key(x), x)]
return _
def merge_value(n=5, key=lambda x: x):
def _(acc, x):
heapq.heappush(acc, (key(x), x))
return heapq.nlargest(n, acc) if len(acc) > n else acc
return _
def merge_combiners(n=5):
def _(acc1, acc2):
merged = list(heapq.merge(acc1, acc2))
return heapq.nlargest(n, merged) if len(merged) > n else merged
return _
(parsed
.map(lambda x: (None, x))
.combineByKey(
create_combiner(key=key), merge_value(key=key), merge_combiners())
.flatMap(lambda x: x[1]))