此 JVM 中只有一个 SparkContext 可能是 运行 - Flask

Only one SparkContext may be running in this JVM - Flask

任何人都可以提供有关为什么这个简单的 Flask 应用程序抱怨 Only one SparkContext may be running in this JVM 的指导。显然,我并没有尝试加载多个上下文。 代码:

import flask
from pyspark  import SparkContext
from operator import itemgetter

app = flask.Flask(__name__)

@app.route('/')
def homepage():
    return 'Example: /dt/140'

@app.route('/dt/<int:delaythreshold>')
def dt(delaythreshold):
    global flights_rdd

    flights_dict =                                         \
        flights_rdd                                        \
        .filter( lambda (day, delay): delay >= threshold ) \
        .countByValue()
    sorted_flight_tuples = \
        sorted( flights_dict.items(), key=itemgetter(1), reverse=True )

    return render_template('delays.html', tuples=sorted_flight_tuples[:5])

if __name__ == '__main__':
    global flights_rdd

    sc = SparkContext()
    flights_rdd =                                     \
        sc.textFile('/tmp/flights.csv', 4)            \
          .map( lambda s: s.split(',') )              \
          .map( lambda l: ( l[0][:4], int(lst[1]) ) ) \
          .cache()

    app.config['DEBUG'] = True
    app.run(host='0.0.0.0')

提前致谢。

您可能不应该创建 "global" 资源,例如 __main__ 部分中的 SparkContext。

特别是,如果您 运行 您的应用程序处于调试模式,模块会在启动时立即重新加载第二次 - 因此尝试创建第二个 SparkContext。 (在创建 SparkContext 之前将 print 'creating sparkcontext' 添加到您的 __main__ 部分 - 您会看到它两次)。

查看 flask 文档以获取有关如何缓存全局资源的建议。 在 http://flask.pocoo.org/docs/0.10/appcontext/#context-usage 之后,您可以例如按如下方式检索 SparkContext:

from flask import g

def get_flights():
    flights_rdd = getattr(g, '_flights_rdd', None)
    if flights_rdd is None:
        # create flights_rdd on the fly
        sc = g._sc = SparkContext()
        flights_rdd =                                     \
            sc.textFile('/tmp/flights.csv', 4)            \
              .map( lambda s: s.split(',') )              \
              .map( lambda l: ( l[0][:4], int(lst[1]) ) ) \
              .cache()
        g._flights_rdd = flights_rdd
    return flights_rdd

@app.teardown_appcontext
def teardown_sparkcontext(exception):
    sc = getattr(g, '_sc', None)
    if sc is not None:
        sc.close()

然后使用 flights_rdd = get_flights() 代替 global flights_rdd