火花可扩展性
Spark scalability
我目前使用一个master(本地机器)和两个worker(2*32核,Memory 2*61.9 GB)用于Spark的标准ALS算法并生成下一个代码用于时间评估:
import numpy as np
from scipy.sparse.linalg import spsolve
import random
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import hashlib
#Spark configuration settings
conf = SparkConf().setAppName("Temp").setMaster("spark://<myip>:7077").set("spark.cores.max","64").set("spark.executor.memory", "61g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
#first time
t1 = time.time()
#load the DataFrame and transform it into RDD<Rating>
rddob = sqlContext.read.json("file.json").rdd
rdd1 = rddob.map(lambda line:(line.ColOne, line.ColTwo))
rdd2 = rdd1.map(lambda line: (line, 1))
rdd3 = rdd2.reduceByKey(lambda a,b: a+b)
ratings = rdd3.map(lambda (line, rating): Rating(int(hash(line[0]) % (10 ** 8)), int(line[1]), float(rating)))
ratings.cache()
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 5
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
#second time
t2 = time.time()
#print results
print "Time of ALS",t2-t1
在此代码中,我将所有参数保持不变,但参数 set("spark.cores.max","x")
除外,我使用 x 的下一个值:1,2,4,8,16,32,64
。我得到了下次评价:
#cores time [s]
1 20722
2 11803
4 5596
8 3131
16 2125
32 2000
64 2051
评价结果对我来说有点奇怪。我看到少量内核具有良好的线性可扩展性。但是在 16、32 和 64 个可能的内核范围内,我看不到任何可扩展性或时间性能的改进。这怎么可能?我的输入文件大约有 70 GB,有 200 000 000 行。
像 Spark 这样的分布式系统的线性可扩展性只是一小部分是核心数量增加的结果。最重要的部分是分配磁盘/网络 IO 的机会。如果您有固定数量的工作人员并且不同时扩展存储,您将很快达到吞吐量受 IO 限制的地步。
我目前使用一个master(本地机器)和两个worker(2*32核,Memory 2*61.9 GB)用于Spark的标准ALS算法并生成下一个代码用于时间评估:
import numpy as np
from scipy.sparse.linalg import spsolve
import random
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
import hashlib
#Spark configuration settings
conf = SparkConf().setAppName("Temp").setMaster("spark://<myip>:7077").set("spark.cores.max","64").set("spark.executor.memory", "61g")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
#first time
t1 = time.time()
#load the DataFrame and transform it into RDD<Rating>
rddob = sqlContext.read.json("file.json").rdd
rdd1 = rddob.map(lambda line:(line.ColOne, line.ColTwo))
rdd2 = rdd1.map(lambda line: (line, 1))
rdd3 = rdd2.reduceByKey(lambda a,b: a+b)
ratings = rdd3.map(lambda (line, rating): Rating(int(hash(line[0]) % (10 ** 8)), int(line[1]), float(rating)))
ratings.cache()
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 5
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
#second time
t2 = time.time()
#print results
print "Time of ALS",t2-t1
在此代码中,我将所有参数保持不变,但参数 set("spark.cores.max","x")
除外,我使用 x 的下一个值:1,2,4,8,16,32,64
。我得到了下次评价:
#cores time [s]
1 20722
2 11803
4 5596
8 3131
16 2125
32 2000
64 2051
评价结果对我来说有点奇怪。我看到少量内核具有良好的线性可扩展性。但是在 16、32 和 64 个可能的内核范围内,我看不到任何可扩展性或时间性能的改进。这怎么可能?我的输入文件大约有 70 GB,有 200 000 000 行。
像 Spark 这样的分布式系统的线性可扩展性只是一小部分是核心数量增加的结果。最重要的部分是分配磁盘/网络 IO 的机会。如果您有固定数量的工作人员并且不同时扩展存储,您将很快达到吞吐量受 IO 限制的地步。