Dstream 上的 Pyspark 过滤操作
Pyspark filter operation on Dstream
我一直在尝试扩展网络字数统计,以便能够根据特定关键字过滤行
我正在使用 spark 1.6.2
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" ")).filter("ERROR")
counts.pprint()
ssc.start()
ssc.awaitTermination()
我已经尝试了所有的变化,
I almost always get the error I cannot apply functions like
pprint/show/take/collect on TransformedDStream
。我在 Dstream 的行上使用带有 foreachRDD 的转换,它具有使用本机 python 字符串方法进行检查的功能,但也失败了(实际上,如果我在程序的任何地方使用打印,spark-submit 就会出现 - 没有报告任何错误。
我想要的是能够根据 "ERROR" 这样的关键字过滤传入的 Dstreams | "WARNING" 等并将其输出到 stdout 或 stderr。
What I want to is to be able to filter the incoming Dstreams on a keyword like "ERROR" | "WARNING" etc and output it to stdout or stderr.
那么你不想调用 flatMap
,因为这会将你的行拆分为单独的标记。相反,您可以将该调用替换为对 filter
的调用,该调用检查该行是否包含 "error"
:
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.pprint()
我一直在尝试扩展网络字数统计,以便能够根据特定关键字过滤行
我正在使用 spark 1.6.2
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 5)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" ")).filter("ERROR")
counts.pprint()
ssc.start()
ssc.awaitTermination()
我已经尝试了所有的变化,
I almost always get the error I cannot apply functions like
pprint/show/take/collect on TransformedDStream
。我在 Dstream 的行上使用带有 foreachRDD 的转换,它具有使用本机 python 字符串方法进行检查的功能,但也失败了(实际上,如果我在程序的任何地方使用打印,spark-submit 就会出现 - 没有报告任何错误。
我想要的是能够根据 "ERROR" 这样的关键字过滤传入的 Dstreams | "WARNING" 等并将其输出到 stdout 或 stderr。
What I want to is to be able to filter the incoming Dstreams on a keyword like "ERROR" | "WARNING" etc and output it to stdout or stderr.
那么你不想调用 flatMap
,因为这会将你的行拆分为单独的标记。相反,您可以将该调用替换为对 filter
的调用,该调用检查该行是否包含 "error"
:
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.pprint()