使用 sqlcontext 进行并行查询
parallel query to spark with sqlcontext
我们正在尝试 运行 EMR 中 spark 的 ETL。 S3 中有大约 20 亿个事件作为 gzipped json 行。它们总共大约有 30 个文件。我正在使用 pyspark。
这是代码,
def value_to_list(columns):
def value_map(values):
data = []
for val in values:
d = val.asDict()
data.append([d[column] for column in columns])
return data
return value_map
def main():
sc = SparkContext()
sql_context = SQLContenxt(sc)
all_events = SQLContenxt(sc).read.json("s3n://...", schema=StructType(fields), timestampFormat="yyyy-MM-dd HH:mm:ss")
all_events.registerTempTable('allevents')
for event_type in event_types:
process_event(sc, event_type, "allevents")
def process_event(sparkcontext, etype, tablename):
query = "select app_id, source, type, {time_cols}, count(*) as total " \
"from {table} where type = '{event_type}' " \
"group by app_id, source, type, {time_cols}"
time_cols_spec = [('hour', 'day', 'month', 'year'),
('day', 'month', 'year'),
('month', 'year'),
('year')]
for time_cols in time_cols_spec:
final_query = query.format(time_cols=", ".join(time_cols),
table=tablename,
event_type=etype)
dataframe = sql_context.sql(final_query)
dataframe.rdd.groupBy(lambda r: r['app_id'])\
.mapValues(value_to_list(['source'] + time_cols))\
.saveAsTextFile("s3n://...")
因此,我们有大约 30 种类型的事件,对于每种事件,我将按小时、天、月和年的 4 种组合进行汇总。所以每个有 4 个查询。我们总共有大约 2000M 个事件。
我正在 运行与
一起使用
- AWS EMR (5.0.3)
- 阿帕奇火花 2.0.1
- 1个主人,2个工人
- 每台机器是
m3.2xlarge
- 总内存为90GB
问题是,最后的保存时间很长。上次查询2次组合1个事件用了14个小时:(
我知道我不会并行进行。循环是顺序的。并且有 2 个循环。但我希望 rdd,groupBy
、mapValues
到 运行 并行。当我查看事件时间线时,我看到 saveAsTextFile
占用了 99% 的时间。可能是因为 spark 懒惰地执行。
我需要使这个过程并行且快速。我怎样才能做到这一点?
您可以应用 4 项主要优化:
您正在对未针对进行查询进行优化的普通 json 文件执行聚合。将它们重写为 parquet,按事件类型重新分区并存储在 S3 上 - 它们将花费更少 space,并且您的应用程序将获得不错的速度提升。
提高并行度。在如此强大的 VM 上不需要驱动程序(master),而是生成一个较小的实例(例如m3.medium
)并将所有 3 个大实例用于 worker。
用Dataframes替换RDD API调用:.rdd.groupBy().mapValues()
可以替换为.groupBy(dataframe.app_id).agg(collect_list())
然后一些映射。
您可以对(小时、天、月、年)数据集的原始数据执行查询,然后使用此聚合来计算给定事件的所有剩余查询。
我们正在尝试 运行 EMR 中 spark 的 ETL。 S3 中有大约 20 亿个事件作为 gzipped json 行。它们总共大约有 30 个文件。我正在使用 pyspark。
这是代码,
def value_to_list(columns):
def value_map(values):
data = []
for val in values:
d = val.asDict()
data.append([d[column] for column in columns])
return data
return value_map
def main():
sc = SparkContext()
sql_context = SQLContenxt(sc)
all_events = SQLContenxt(sc).read.json("s3n://...", schema=StructType(fields), timestampFormat="yyyy-MM-dd HH:mm:ss")
all_events.registerTempTable('allevents')
for event_type in event_types:
process_event(sc, event_type, "allevents")
def process_event(sparkcontext, etype, tablename):
query = "select app_id, source, type, {time_cols}, count(*) as total " \
"from {table} where type = '{event_type}' " \
"group by app_id, source, type, {time_cols}"
time_cols_spec = [('hour', 'day', 'month', 'year'),
('day', 'month', 'year'),
('month', 'year'),
('year')]
for time_cols in time_cols_spec:
final_query = query.format(time_cols=", ".join(time_cols),
table=tablename,
event_type=etype)
dataframe = sql_context.sql(final_query)
dataframe.rdd.groupBy(lambda r: r['app_id'])\
.mapValues(value_to_list(['source'] + time_cols))\
.saveAsTextFile("s3n://...")
因此,我们有大约 30 种类型的事件,对于每种事件,我将按小时、天、月和年的 4 种组合进行汇总。所以每个有 4 个查询。我们总共有大约 2000M 个事件。
我正在 运行与
一起使用- AWS EMR (5.0.3)
- 阿帕奇火花 2.0.1
- 1个主人,2个工人
- 每台机器是
m3.2xlarge
- 总内存为90GB
问题是,最后的保存时间很长。上次查询2次组合1个事件用了14个小时:(
我知道我不会并行进行。循环是顺序的。并且有 2 个循环。但我希望 rdd,groupBy
、mapValues
到 运行 并行。当我查看事件时间线时,我看到 saveAsTextFile
占用了 99% 的时间。可能是因为 spark 懒惰地执行。
我需要使这个过程并行且快速。我怎样才能做到这一点?
您可以应用 4 项主要优化:
您正在对未针对进行查询进行优化的普通 json 文件执行聚合。将它们重写为 parquet,按事件类型重新分区并存储在 S3 上 - 它们将花费更少 space,并且您的应用程序将获得不错的速度提升。
提高并行度。在如此强大的 VM 上不需要驱动程序(master),而是生成一个较小的实例(例如
m3.medium
)并将所有 3 个大实例用于 worker。用Dataframes替换RDD API调用:
.rdd.groupBy().mapValues()
可以替换为.groupBy(dataframe.app_id).agg(collect_list())
然后一些映射。您可以对(小时、天、月、年)数据集的原始数据执行查询,然后使用此聚合来计算给定事件的所有剩余查询。