Spark:运行 具有正确数量的执行者的 spark-submit
Spark: running spark-submit with the correct number of executors
我已经设置了一个基本的 EMR 3 节点集群,运行 spark-submit 的 --executor-memory
设置为 1G
,没有其他配置。
脚本本身是一个基本的基准测试任务:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
import time
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# sample data in lineitem table:
# 3|1284483|34508|3|27|39620.34|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |
def mapper(lines):
x = lines.split("|")
return Row( rownum=int(x[0]),
l_orderkey=int(x[0]),
l_partkey=int(x[1]),
l_suppkey=int(x[2]),
l_linenumber=int(x[3]),
l_quantity=int(x[4]),
l_extendedprice=float(x[5]),
l_discount=float(x[6]),
l_tax=float(x[7]),
l_returnflag=x[8],
l_linestatus=x[9],
l_shipdate=x[10],
l_commitdate=x[11],
l_receiptdate=x[12],
l_shipinstruct=x[13],
l_shipment=x[14],
l_comment=x[15],
)
# ORDERKEY
# PARTKEY
# SUPPKEY
# LINENUMBER
# QUANTITY
# EXTENDEDPRICE
# DISCOUNT
# TAX
# RETURNFLAG
# LINESTATUS
# SHIPDATE
# COMMITDATE
# RECEIPTDATE
# SHIPINSTRUCT
# SHIPMODE
# COMMENT
rdd = sc.textFile("s3://sampletpchdata/10gb/lineitem.tbl.*")
# kick off an initial count
print rdd.count()
sample = rdd.map(mapper)
schemaSample = sqlContext.createDataFrame( sample )
schemaSample.registerTempTable("lineitem")
# run TPCH query 1
results = sqlContext.sql("""
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date_sub(cast('1998-12-01' as date), '60')
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus
""")
# kick off a final count of the results
print results.count()
在此期间,我查看了 spark API 的执行程序端点的结果,得到了这个结果:
[ {
"id" : "driver",
"hostPort" : "10.232.13.130:47656",
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"activeTasks" : 0,
"failedTasks" : 0,
"completedTasks" : 0,
"totalTasks" : 0,
"totalDuration" : 0,
"totalInputBytes" : 0,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"maxMemory" : 7975010304,
"executorLogs" : { }
}, {
"id" : "1",
"hostPort" : "ip-10-232-13-123.us-west-1.compute.internal:58544",
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"activeTasks" : 0,
"failedTasks" : 0,
"completedTasks" : 641,
"totalTasks" : 641,
"totalDuration" : 4998902,
"totalInputBytes" : 3490792,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 395870,
"maxMemory" : 7790985216,
"executorLogs" : {
"stdout" : "http://somenode:8042/node/containerlogs/container_1456781958356_0004_01_000009/hadoop/stdout?start=-4096",
"stderr" : "http://somenode:8042/node/containerlogs/container_1456781958356_0004_01_000009/hadoop/stderr?start=-4096"
}
} ]
除非我误解了这个结果,否则在我的 3 节点集群中,似乎只有 1 个驱动程序和 1 个执行程序。这是怎么回事?如果是这样,难道不应该有比这更多的执行者吗?我该如何实现?
您还必须使用 --num-executors 来选择要 运行 您的代码的执行程序数量。
我已经设置了一个基本的 EMR 3 节点集群,运行 spark-submit 的 --executor-memory
设置为 1G
,没有其他配置。
脚本本身是一个基本的基准测试任务:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
import time
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
# sample data in lineitem table:
# 3|1284483|34508|3|27|39620.34|0.06|0.07|A|F|1994-01-16|1993-11-22|1994-01-23|DELIVER IN PERSON|SHIP|nal foxes wake. |
def mapper(lines):
x = lines.split("|")
return Row( rownum=int(x[0]),
l_orderkey=int(x[0]),
l_partkey=int(x[1]),
l_suppkey=int(x[2]),
l_linenumber=int(x[3]),
l_quantity=int(x[4]),
l_extendedprice=float(x[5]),
l_discount=float(x[6]),
l_tax=float(x[7]),
l_returnflag=x[8],
l_linestatus=x[9],
l_shipdate=x[10],
l_commitdate=x[11],
l_receiptdate=x[12],
l_shipinstruct=x[13],
l_shipment=x[14],
l_comment=x[15],
)
# ORDERKEY
# PARTKEY
# SUPPKEY
# LINENUMBER
# QUANTITY
# EXTENDEDPRICE
# DISCOUNT
# TAX
# RETURNFLAG
# LINESTATUS
# SHIPDATE
# COMMITDATE
# RECEIPTDATE
# SHIPINSTRUCT
# SHIPMODE
# COMMENT
rdd = sc.textFile("s3://sampletpchdata/10gb/lineitem.tbl.*")
# kick off an initial count
print rdd.count()
sample = rdd.map(mapper)
schemaSample = sqlContext.createDataFrame( sample )
schemaSample.registerTempTable("lineitem")
# run TPCH query 1
results = sqlContext.sql("""
SELECT
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= date_sub(cast('1998-12-01' as date), '60')
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus
""")
# kick off a final count of the results
print results.count()
在此期间,我查看了 spark API 的执行程序端点的结果,得到了这个结果:
[ {
"id" : "driver",
"hostPort" : "10.232.13.130:47656",
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"activeTasks" : 0,
"failedTasks" : 0,
"completedTasks" : 0,
"totalTasks" : 0,
"totalDuration" : 0,
"totalInputBytes" : 0,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 0,
"maxMemory" : 7975010304,
"executorLogs" : { }
}, {
"id" : "1",
"hostPort" : "ip-10-232-13-123.us-west-1.compute.internal:58544",
"rddBlocks" : 0,
"memoryUsed" : 0,
"diskUsed" : 0,
"activeTasks" : 0,
"failedTasks" : 0,
"completedTasks" : 641,
"totalTasks" : 641,
"totalDuration" : 4998902,
"totalInputBytes" : 3490792,
"totalShuffleRead" : 0,
"totalShuffleWrite" : 395870,
"maxMemory" : 7790985216,
"executorLogs" : {
"stdout" : "http://somenode:8042/node/containerlogs/container_1456781958356_0004_01_000009/hadoop/stdout?start=-4096",
"stderr" : "http://somenode:8042/node/containerlogs/container_1456781958356_0004_01_000009/hadoop/stderr?start=-4096"
}
} ]
除非我误解了这个结果,否则在我的 3 节点集群中,似乎只有 1 个驱动程序和 1 个执行程序。这是怎么回事?如果是这样,难道不应该有比这更多的执行者吗?我该如何实现?
您还必须使用 --num-executors 来选择要 运行 您的代码的执行程序数量。