此 JVM 中只有一个 SparkContext 可能是 运行 - Flask
Only one SparkContext may be running in this JVM - Flask
任何人都可以提供有关为什么这个简单的 Flask 应用程序抱怨 Only one SparkContext may be running in this JVM
的指导。显然,我并没有尝试加载多个上下文。
代码:
import flask
from pyspark import SparkContext
from operator import itemgetter
app = flask.Flask(__name__)
@app.route('/')
def homepage():
return 'Example: /dt/140'
@app.route('/dt/<int:delaythreshold>')
def dt(delaythreshold):
global flights_rdd
flights_dict = \
flights_rdd \
.filter( lambda (day, delay): delay >= threshold ) \
.countByValue()
sorted_flight_tuples = \
sorted( flights_dict.items(), key=itemgetter(1), reverse=True )
return render_template('delays.html', tuples=sorted_flight_tuples[:5])
if __name__ == '__main__':
global flights_rdd
sc = SparkContext()
flights_rdd = \
sc.textFile('/tmp/flights.csv', 4) \
.map( lambda s: s.split(',') ) \
.map( lambda l: ( l[0][:4], int(lst[1]) ) ) \
.cache()
app.config['DEBUG'] = True
app.run(host='0.0.0.0')
提前致谢。
您可能不应该创建 "global" 资源,例如 __main__
部分中的 SparkContext。
特别是,如果您 运行 您的应用程序处于调试模式,模块会在启动时立即重新加载第二次 - 因此尝试创建第二个 SparkContext
。 (在创建 SparkContext
之前将 print 'creating sparkcontext'
添加到您的 __main__
部分 - 您会看到它两次)。
查看 flask 文档以获取有关如何缓存全局资源的建议。
在 http://flask.pocoo.org/docs/0.10/appcontext/#context-usage 之后,您可以例如按如下方式检索 SparkContext:
from flask import g
def get_flights():
flights_rdd = getattr(g, '_flights_rdd', None)
if flights_rdd is None:
# create flights_rdd on the fly
sc = g._sc = SparkContext()
flights_rdd = \
sc.textFile('/tmp/flights.csv', 4) \
.map( lambda s: s.split(',') ) \
.map( lambda l: ( l[0][:4], int(lst[1]) ) ) \
.cache()
g._flights_rdd = flights_rdd
return flights_rdd
@app.teardown_appcontext
def teardown_sparkcontext(exception):
sc = getattr(g, '_sc', None)
if sc is not None:
sc.close()
然后使用 flights_rdd = get_flights()
代替 global flights_rdd
。
任何人都可以提供有关为什么这个简单的 Flask 应用程序抱怨 Only one SparkContext may be running in this JVM
的指导。显然,我并没有尝试加载多个上下文。
代码:
import flask
from pyspark import SparkContext
from operator import itemgetter
app = flask.Flask(__name__)
@app.route('/')
def homepage():
return 'Example: /dt/140'
@app.route('/dt/<int:delaythreshold>')
def dt(delaythreshold):
global flights_rdd
flights_dict = \
flights_rdd \
.filter( lambda (day, delay): delay >= threshold ) \
.countByValue()
sorted_flight_tuples = \
sorted( flights_dict.items(), key=itemgetter(1), reverse=True )
return render_template('delays.html', tuples=sorted_flight_tuples[:5])
if __name__ == '__main__':
global flights_rdd
sc = SparkContext()
flights_rdd = \
sc.textFile('/tmp/flights.csv', 4) \
.map( lambda s: s.split(',') ) \
.map( lambda l: ( l[0][:4], int(lst[1]) ) ) \
.cache()
app.config['DEBUG'] = True
app.run(host='0.0.0.0')
提前致谢。
您可能不应该创建 "global" 资源,例如 __main__
部分中的 SparkContext。
特别是,如果您 运行 您的应用程序处于调试模式,模块会在启动时立即重新加载第二次 - 因此尝试创建第二个 SparkContext
。 (在创建 SparkContext
之前将 print 'creating sparkcontext'
添加到您的 __main__
部分 - 您会看到它两次)。
查看 flask 文档以获取有关如何缓存全局资源的建议。 在 http://flask.pocoo.org/docs/0.10/appcontext/#context-usage 之后,您可以例如按如下方式检索 SparkContext:
from flask import g
def get_flights():
flights_rdd = getattr(g, '_flights_rdd', None)
if flights_rdd is None:
# create flights_rdd on the fly
sc = g._sc = SparkContext()
flights_rdd = \
sc.textFile('/tmp/flights.csv', 4) \
.map( lambda s: s.split(',') ) \
.map( lambda l: ( l[0][:4], int(lst[1]) ) ) \
.cache()
g._flights_rdd = flights_rdd
return flights_rdd
@app.teardown_appcontext
def teardown_sparkcontext(exception):
sc = getattr(g, '_sc', None)
if sc is not None:
sc.close()
然后使用 flights_rdd = get_flights()
代替 global flights_rdd
。