异常("storageLevel must be of type pyspark.StorageLevel")

Exception("storageLevel must be of type pyspark.StorageLevel")

您好,我正在尝试 flume 与 pyspark 集成,但出现错误。

enter image description here

这是代码

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import sys
from operator import add
from pyspark.sql import functions
hostname= sys.argv[1]
port= sys.argv[2]
renga = SparkConf().setMaster("yarn-client").setAppName("karthik")
amma= SparkContext(conf=renga)
appa=StreamingContext(amma,30)
rajagopal= FlumeUtils.createPollingStream(appa,hostname,port)
rajagopal.persist(StorageLevel.MEMORY_ONLY)
mohan= rajagopal.map(lambda m:m[1])
kowsi= mohan.flatMap(lambda fm : fm.split(" ")[6] == "deparment")
ujesh= kowsi.map(lambda m : (m.split(" ")[6].split("/")[1],1))
balaji=ujesh.reduceByKey(add)

balaji.saveAsTextFiles("xxxxx/user/shashankbh/jarvis/data/flume/conf")

appa.start()
appa.awaitTermination()

提前致谢

此致, 雷加纳森

FlumeUtils.createPollingStream 的函数签名是

FlumeUtils.createPollingStream(
  ssc,
  addresses,
  storageLevel=StorageLevel(True, True, False, False, 2), 
  maxBatchSize=1000, 
  parallelism=5,
  bodyDecoder=<function utf8_decoder at 0x7efe1daea488>)

这意味着第三个位置参数是存储级别。在您的代码中,您传递了 port,这是您从 sys.argv[2] 获得的字符串,因此不是有效的存储级别。

这就是堆栈跟踪的意思。学习如何理解堆栈跟踪对任何程序员来说都是一项很好的技能。