使用 sqlcontext 进行并行查询

parallel query to spark with sqlcontext

我们正在尝试 运行 EMR 中 spark 的 ETL。 S3 中有大约 20 亿个事件作为 gzipped json 行。它们总共大约有 30 个文件。我正在使用 pyspark。

这是代码,

def value_to_list(columns):
    def value_map(values):
        data = []
        for val in values:
            d = val.asDict()
            data.append([d[column] for column in columns])
        return data

    return value_map


def main():
    sc = SparkContext()
    sql_context = SQLContenxt(sc)

    all_events = SQLContenxt(sc).read.json("s3n://...", schema=StructType(fields), timestampFormat="yyyy-MM-dd HH:mm:ss")
    all_events.registerTempTable('allevents')

    for event_type in event_types:
        process_event(sc, event_type, "allevents")


def process_event(sparkcontext, etype, tablename):
    query = "select app_id, source, type, {time_cols}, count(*) as total " \
            "from {table} where  type = '{event_type}' " \
            "group by app_id, source, type, {time_cols}"
    time_cols_spec = [('hour', 'day', 'month', 'year'),
                      ('day', 'month', 'year'),
                      ('month', 'year'),
                      ('year')]

    for time_cols in time_cols_spec:
        final_query = query.format(time_cols=", ".join(time_cols),
                                   table=tablename,
                                   event_type=etype)
        dataframe = sql_context.sql(final_query)

        dataframe.rdd.groupBy(lambda r: r['app_id'])\
            .mapValues(value_to_list(['source'] + time_cols))\
            .saveAsTextFile("s3n://...")

因此,我们有大约 30 种类型的事件,对于每种事件,我将按小时、天、月和年的 4 种组合进行汇总。所以每个有 4 个查询。我们总共有大约 2000M 个事件。

我正在 运行与

一起使用

问题是,最后的保存时间很长。上次查询2次组合1个事件用了14个小时:(

我知道我不会并行进行。循环是顺序的。并且有 2 个循环。但我希望 rdd,groupBymapValues 到 运行 并行。当我查看事件时间线时,我看到 saveAsTextFile 占用了 99% 的时间。可能是因为 spark 懒惰地执行。

我需要使这个过程并行且快速。我怎样才能做到这一点?

您可以应用 4 项主要优化:

  1. 您正在对未针对进行查询进行优化的普通 json 文件执行聚合。将它们重写为 parquet,按事件类型重新分区并存储在 S3 上 - 它们将花费更少 space,并且您的应用程序将获得不错的速度提升。

  2. 提高并行度。在如此强大的 VM 上不需要驱动程序(master),而是生成一个较小的实例(例如m3.medium)并将所有 3 个大实例用于 worker。

  3. 用Dataframes替换RDD API调用:.rdd.groupBy().mapValues()可以替换为.groupBy(dataframe.app_id).agg(collect_list())然后一些映射。

  4. 您可以对(小时、天、月、年)数据集的原始数据执行查询,然后使用此聚合来计算给定事件的所有剩余查询。