为什么我的 Spark 流应用程序这么慢?

Why is my Spark streaming app so slow?

我有一个包含 4 个节点的集群:3 个 Spark 节点和 1 个 Solr 节点。我的CPU是8核,内存是32GB,盘space是SSD。我使用 cassandra 作为我的数据库。 6 小时后我的数据量为 22GB,现在我有大约 340 万行,应该在 5 分钟内读取。

但它已经无法在这段时间内完成任务。我未来的计划是在 5 分钟内阅读 1 亿行 。我不确定我现在可以增加什么或做得更好来实现这个结果以及实现我未来的目标。这甚至可能还是使用 spark 进行实时分析并使用 hadoop 进行更长的尾数据(比 1 天或几个小时更早)会更好? 非常感谢! 这是我的 Spark 应用程序代码:

import sys
import json
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, Row
from pyspark.streaming.kafka import KafkaUtils
from datetime import datetime, timedelta 
from dateutil.parser import parse 
from cassandra.cluster import Cluster
import pytz
from dateutil.tz import tzutc
tz = pytz.timezone('')
appname = str(sys.argv[1])
source = str(sys.argv[2])
cluster = Cluster(['localhost']);
session_statis = cluster.connect('keyspace')
def read_json(x):
    try:
        y = json.loads(x)
    except:
        y = 0
    return y
def TransformInData(x):
    try:
        body = json.loads(x['body'])
        return (body['articles'])
    except:
        return 0
def axesTransformData(x):
    try:
        body = json.loads(x['body'])
        return (body)
    except:
        return 0
def storeDataToCassandra(rdd):
    rdd_cassandra =rdd.map(lambda x:(x[0],(x[0],x[1]['thumbnail'], x[1]['title'], x[1]['url'], datetime.strptime(parse(x[1]['created_at']).strftime('%Y-%m-%d %H:%M:%S'), "%Y-%m-%d %H:%M:%S"),source, x[1]['category'] if x[1]['category'] else '', x[1]['channel'],x[1]['genre']))) \
                            .subtract(articles)
    rdd_article = rdd_cassandra.map(lambda x:Row(id=x[1][0],source=x[1][5],thumbnail=x[1][1],title=x[1][2],url=x[1][3],created_at=x[1][4],category=x[1][6],channel=x[1][7],genre=x[1][8]))
    rdd_schedule = rdd_cassandra.map(lambda x:Row(source=x[1][5],type='article',scheduled_for=x[1][4]+timedelta(minutes=5),id=x[1][0]))
    rdd_article_by_created_at = rdd_cassandra.map(lambda x:Row(source=x[1][5],created_at=x[1][4],article=x[1][0]))
    rdd_article_by_url = rdd_cassandra.map(lambda x:Row(url=x[1][3],article=x[1][0]))
    if rdd_article.count()>0:
        result_rdd_article = sqlContext.createDataFrame(rdd_article)
        result_rdd_article.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
    if rdd_schedule.count()>0:   
        result_rdd_schedule = sqlContext.createDataFrame(rdd_schedule)
        result_rdd_schedule.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
    if rdd_article_by_created_at.count()>0:  
        result_rdd_article_by_created_at = sqlContext.createDataFrame(rdd_article_by_created_at)
        result_rdd_article_by_created_at.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
    if rdd_article_by_url.count()>0:   
        result_rdd_article_by_url = sqlContext.createDataFrame(rdd_article_by_url)
        result_rdd_article_by_url.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
#     
def axesStoreToCassandra(rdd):
    axes_rdd = rdd.map(lambda x:Row(article=x[1]['id'],at=datetime.now(),comments=x[1]['comments'],likes=x[1]['attitudes'],reads=0,shares=x[1]['reposts']))
    if axes_rdd.count()>0:
        result_axes_rdd = sqlContext.createDataFrame(axes_rdd)
        result_axes_rdd.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")



def joinstream(rdd):
    article_channels = articlestat.join(channels).map(lambda x:(x[1][0]['id'],{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'genre':x[1][0]['genre'],'category':x[1][1]['category'],'author':x[1][1]['author']}))
    speed_rdd = axes.map(lambda x:(x.article,[[x.at,x.comments,x.likes,x.reads,x.shares]])) \
                .reduceByKey(lambda x,y:x+y) \
                .map(lambda x:(x[0],sorted(x[1],key=lambda y:y[0],reverse = True)[0],sorted(x[1],key=lambda y:y[0],reverse = True)[1]) if len(x[1])>=2 else (x[0],sorted(x[1],key=lambda y:y[0],reverse = True)[0],[sorted(x[1],key=lambda y:y[0],reverse = True)[0][0]-timedelta(seconds=300),0,0,0,0])) \
                .filter(lambda x:(x[1][0]-x[2][0]).seconds>0) \
                .map(lambda x:(x[0],{'id':x[0],'comments':x[1][1],'likes':x[1][2],'reads':x[1][3],'shares':x[1][4],'speed':int(5*288*((x[1][4]-x[2][4])/((x[1][0]-x[2][0]).seconds/60.0)))})) \
                .filter(lambda x:x[1]['speed']>=0) \
                .filter(lambda x:x[1]['shares']>0)
    statistics = article_channels.join(speed_rdd)  \
                .map(lambda x:{'id':x[1][0]['id'],'thumbnail':x[1][0]['thumbnail'],'title':x[1][0]['title'],'url':x[1][0]['url'],'created_at':x[1][0]['created_at'],'source':x[1][0]['source'],'category':x[1][0]['category'],'author':x[1][0]['author'],'genre':x[1][0]['genre'],'comments':x[1][1]['comments'],'likes':x[1][1]['likes'],'reads':x[1][1]['reads'],'shares':x[1][1]['shares'],'speed':x[1][1]['speed']})
    timeone=datetime.now()-timedelta(hours=1)
    timethree = datetime.now()-timedelta(hours=3)
    timesix = datetime.now()-timedelta(hours=6)
    timetwelve = datetime.now()-timedelta(hours=12)
    timetwentyfour = datetime.now()-timedelta(hours=24)
    result1 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timeone).map(lambda x:Row(timespan='1',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    result3 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timethree  and x['created_at']+timedelta(hours=8)<=timeone).map(lambda x:Row(timespan='3',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    result6 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timesix  and x['created_at']+timedelta(hours=8)<=timethree).map(lambda x:Row(timespan='6',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    result12 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwelve  and x['created_at']+timedelta(hours=8)<=timesix).map(lambda x:Row(timespan='12',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    result24 = statistics.filter(lambda x:x['created_at']+timedelta(hours=8)>=timetwentyfour  and x['created_at']+timedelta(hours=8)<=timetwelve).map(lambda x:Row(timespan='24',source=source,id=x['id'],title=x['title'],thumbnail=x['thumbnail'],url=x['url'],created_at=x['created_at']+timedelta(hours=8),genre=x['genre'],reads=0,likes=x['likes'],comments=x['comments'],shares=x['shares'],speed=x['speed'],category=x['category'],author=x['author']))
    if result1.count()>0:
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'1'))
        resultschema1 = sqlContext.createDataFrame(result1)
        resultschema1.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
    if result3.count()>0:   
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'3'))
        resultschema3 = sqlContext.createDataFrame(result3)
        resultschema3.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")

    if result6.count()>0:
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'6'))
        resultschema6 = sqlContext.createDataFrame(result6)
        resultschema6.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")

    if result12.count()>0:
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'12'))
        resultschema12 = sqlContext.createDataFrame(result12)
        resultschema12.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")

    if result24.count()>0:
        session_statis.execute('DELETE FROM tablename WHERE source = %s and timespan= %s', (source,'24'))
        resultschema24 = sqlContext.createDataFrame(result24)
        resultschema24.write.format("org.apache.spark.sql.cassandra").options(table="tablename", keyspace = "keyspace").save(mode ="append")
conf = SparkConf().setAppName(appname)
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc,30)
sqlContext = SQLContext(sc)
channels = sc.cassandraTable("keyspace","tablename").map(lambda x:(x.id,{'author':x.name,'category':x.category}))
articles = sc.cassandraTable('keyspace','tablename').map(lambda x:(x.id,(x.id,x.thumbnail,x.title,x.url,x.created_at+timedelta(hours=8),source,x.category,x.channel,x.genre)))
articlestat = sc.cassandraTable('keyspace','tablename').map(lambda x:(x.channel,{'id':x.id,'thumbnail':x.thumbnail,'title':x.title,'url':x.url,'created_at':x.created_at,'source':x.source,'category':x.category,'channel':x.channel,'genre':x.genre}))
axes = sc.cassandraTable('keyspace','tablename')
topic = 'topic1'
kafkaParams = {"metadata.broker.list": "localhost:9092"}
article_stream = KafkaUtils.createDirectStream(ssc, [topic], kafkaParams)
article_join_stream=article_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:TransformInData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(x['id'].encode("utf-8") ,x))
article_join_stream.transform(storeDataToCassandra).pprint()
axes_topic = 'topic2'
axes_stream = KafkaUtils.createDirectStream(ssc, [axes_topic], kafkaParams)
axes_join_stream = axes_stream.map(lambda x:read_json(x[1])).filter(lambda x: x!=0).map(lambda x:axesTransformData(x)).filter(lambda x: x!=0).flatMap(lambda x:(a for a in x)).map(lambda x:(str(x['id']),x))
axes_join_stream.transform(axesStoreToCassandra).pprint()
statistics = article_join_stream.map(lambda x:(x[0])).window(15*60,15*60)
statistics.transform(joinstream).pprint()
ssc.start()    

编辑: 这是似乎消耗最多时间的阶段。有什么想法吗?

乍一看,您似乎只是使用 "spark-submit <your application>"

启动您的应用程序

这意味着您正在为您的应用程序使用默认的内存分配和 CPU(在大多数默认情况下大约为 1cpu 和 512MB 内存)

这是假设您使用的是 YARN,因为您没有提供这方面的信息。

使用适当的资源启动您的应用程序,您会看到改进。

编辑:

我看到您使用了很多 lambda,它们需要序列化。 请注意,在使用对象时,您每次都在传递整个对象。

即您正在使用完整的对象 this.value 而不仅仅是 value。 要解决此问题,您可以使用局部变量 _value = this.value 并使用它继续。

这可能会为您提供加速。