Spark Streaming 应用程序太慢
SparkStreaming application too slow
在开发 SparkStreaming 应用程序 (python) 时,我不确定我是否理解它的工作原理。
我只需要读取 json 文件流(在目录中弹出)并对每个 json 对象和引用执行连接操作,然后将其写回文本文件。这是我的代码:
config = configparser.ConfigParser()
config.read("config.conf")
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
# Création du contexte
sc = SparkContext()
ssc = StreamingContext(sc, int(config["Variables"]["batch_period_spark"]))
sqlCtxt = getSparkSessionInstance(sc.getConf())
df_ref = sqlCtxt.read.json("file://" + config["Paths"]["path_ref"])
df_ref.createOrReplaceTempView("REF")
df_ref.cache()
output = config["Paths"]["path_DATAs_enri"]
# Fonction de traitement des DATAs
def process(rdd):
if rdd.count() > 0:
#print(rdd.toDebugString)
df_DATAs = sqlCtxt.read.json(rdd)
df_DATAs.createOrReplaceTempView("DATAs")
df_enri=sqlCtxt.sql("SELECT DATAs.*, REF.Name, REF.Mail FROM DATAs, REF WHERE DATAs.ID = REF.ID")
df_enri.createOrReplaceTempView("DATAs_enri")
df_enri.write.mode('append').json("file://" + output)
if(df_enri.count() < df_DATAs.count()):
df_fail = sqlCtxt.sql("SELECT * FROM DATAs WHERE DATAs.ID NOT IN (SELECT ID FROM DATAs_enri)")
df_fail.show()
# Configuration du stream et lancement
files = ssc.textFileStream("file://" + config["Paths"]["path_stream_DATAs"])
files.foreachRDD(process)
print("[GO]")
ssc.start()
ssc.awaitTermination()
这是我的 spark 配置:
spark.master local[*]
spark.executor.memory 3g
spark.driver.memory 3g
spark.python.worker.memory 3g
spark.memory.fraction 0.9
spark.driver.maxResultSize 3g
spark.memory.storageFraction 0.9
spark.eventLog.enabled true
好吧,它正在运行,但我有一个问题:进程缓慢并且进程延迟越来越大。我在local[*]工作,怕没有parallelism。。。在监控UI,一次只看到一个executor和一个job。有没有更简单的方法来做到这一点?就像 DStream 上的 transform 函数一样?是否缺少配置变量?
您的代码运行缓慢有几个原因。
关于worker,我看到没有看到你设置worker数量的地方。因此它将以默认的工作人员数量开始,这可能意味着 1。另一方面,您正在读取一个可能不是那么大的文件,而 spark 没有进行并行处理。
另一方面,您需要理解代码的几个步骤:
- 你有很多计数:
if rdd.count() > 0:; if(df_enri.count() < df_DATAs.count()):
,计数很昂贵,你的流数据是一个减少阶段,你正在做 3 倍的计数。
- 加入也很昂贵,在流式处理中进行加入并不是那么好,你做对了
df_ref.cache()
但是,加入会随机播放而且很昂贵。
我建议您,不要执行失败的步骤,将其从您的代码中删除。它没有用,只是不保存数据。其他事情,设置更多的工人或更多的核心执行: spark.executor.cores=2
如你所见 here.
在开发 SparkStreaming 应用程序 (python) 时,我不确定我是否理解它的工作原理。 我只需要读取 json 文件流(在目录中弹出)并对每个 json 对象和引用执行连接操作,然后将其写回文本文件。这是我的代码:
config = configparser.ConfigParser()
config.read("config.conf")
def getSparkSessionInstance(sparkConf):
if ("sparkSessionSingletonInstance" not in globals()):
globals()["sparkSessionSingletonInstance"] = SparkSession \
.builder \
.config(conf=sparkConf) \
.getOrCreate()
return globals()["sparkSessionSingletonInstance"]
# Création du contexte
sc = SparkContext()
ssc = StreamingContext(sc, int(config["Variables"]["batch_period_spark"]))
sqlCtxt = getSparkSessionInstance(sc.getConf())
df_ref = sqlCtxt.read.json("file://" + config["Paths"]["path_ref"])
df_ref.createOrReplaceTempView("REF")
df_ref.cache()
output = config["Paths"]["path_DATAs_enri"]
# Fonction de traitement des DATAs
def process(rdd):
if rdd.count() > 0:
#print(rdd.toDebugString)
df_DATAs = sqlCtxt.read.json(rdd)
df_DATAs.createOrReplaceTempView("DATAs")
df_enri=sqlCtxt.sql("SELECT DATAs.*, REF.Name, REF.Mail FROM DATAs, REF WHERE DATAs.ID = REF.ID")
df_enri.createOrReplaceTempView("DATAs_enri")
df_enri.write.mode('append').json("file://" + output)
if(df_enri.count() < df_DATAs.count()):
df_fail = sqlCtxt.sql("SELECT * FROM DATAs WHERE DATAs.ID NOT IN (SELECT ID FROM DATAs_enri)")
df_fail.show()
# Configuration du stream et lancement
files = ssc.textFileStream("file://" + config["Paths"]["path_stream_DATAs"])
files.foreachRDD(process)
print("[GO]")
ssc.start()
ssc.awaitTermination()
这是我的 spark 配置:
spark.master local[*]
spark.executor.memory 3g
spark.driver.memory 3g
spark.python.worker.memory 3g
spark.memory.fraction 0.9
spark.driver.maxResultSize 3g
spark.memory.storageFraction 0.9
spark.eventLog.enabled true
好吧,它正在运行,但我有一个问题:进程缓慢并且进程延迟越来越大。我在local[*]工作,怕没有parallelism。。。在监控UI,一次只看到一个executor和一个job。有没有更简单的方法来做到这一点?就像 DStream 上的 transform 函数一样?是否缺少配置变量?
您的代码运行缓慢有几个原因。
关于worker,我看到没有看到你设置worker数量的地方。因此它将以默认的工作人员数量开始,这可能意味着 1。另一方面,您正在读取一个可能不是那么大的文件,而 spark 没有进行并行处理。
另一方面,您需要理解代码的几个步骤:
- 你有很多计数:
if rdd.count() > 0:; if(df_enri.count() < df_DATAs.count()):
,计数很昂贵,你的流数据是一个减少阶段,你正在做 3 倍的计数。 - 加入也很昂贵,在流式处理中进行加入并不是那么好,你做对了
df_ref.cache()
但是,加入会随机播放而且很昂贵。
我建议您,不要执行失败的步骤,将其从您的代码中删除。它没有用,只是不保存数据。其他事情,设置更多的工人或更多的核心执行: spark.executor.cores=2
如你所见 here.