从 Flask 应用访问 Spark

Access to Spark from Flask app

我编写了一个简单的 Flask 应用程序来将一些数据传递给 Spark。该脚本在 IPython Notebook 中有效,但当我尝试在其自己的服务器中 运行 时无效。我不认为脚本中的 Spark 上下文是 运行ning。如何让 Spark 在以下示例中工作?

from flask import Flask, request
from pyspark import SparkConf, SparkContext

app = Flask(__name__)

conf = SparkConf()
conf.setMaster("local")
conf.setAppName("SparkContext1")
conf.set("spark.executor.memory", "1g")
sc = SparkContext(conf=conf)

@app.route('/accessFunction', methods=['POST'])
def toyFunction():
    posted_data = sc.parallelize([request.get_data()])
    return str(posted_data.collect()[0])

if __name__ == '__main_':
    app.run(port=8080)

在 IPython Notebook 中我没有定义 SparkContext 因为它是自动配置的。我不记得我是怎么做到的,我关注了一些博客。

在 Linux 服务器上,我已将 .py 设置为始终 运行ning 并按照 this guide.[=16= 的第 5 步安装了最新的 Spark ]

编辑:

根据 davidism 的建议,我现在转而求助于越来越复杂的简单程序来定位错误。

首先,我仅使用以下答案中的脚本创建了 .py(在适当调整链接后):

import sys
try:
    sys.path.append("your/spark/home/python")
    from pyspark import context
    print ("Successfully imported Spark Modules")
except ImportError as e:
    print ("Can not import Spark Modules", e)

这个returns"Successfully imported Spark Modules"。但是,我将下一个 .py 文件设为 returns 异常:

from pyspark import SparkContext
sc = SparkContext('local')
rdd = sc.parallelize([0])
print rdd.count()

这个returns异常:

"Java gateway process exited before sending the driver its port number"

四处寻找类似的问题,我发现 this page but when I run this code nothing happens, no print on the console and no error messages. Similarly, this 也没有帮助,我得到了与上面相同的 Java 网关异常。我还安装了 anaconda,因为我听说这可能有助于联合 python 和 java,但还是没有成功...

关于下一步尝试的任何建议?我很茫然。

修改您的 .py 文件,如链接指南 'Using IPython Notebook with Spark' 部分第二点中所示。 Insted sys.path.insert 使用 sys.path.append。尝试插入此代码段:

import sys
try:
    sys.path.append("your/spark/home/python")
    from pyspark import context
    print ("Successfully imported Spark Modules")
except ImportError as e:
    print ("Can not import Spark Modules", e)

好的,所以我要回答我自己的问题,希望外面的人不会遭受同样的挫折!事实证明这是缺少代码和错误设置的结合。

正在编辑代码: 我确实需要通过在代码的序言中附加以下内容来初始化 Spark 上下文:

from pyspark import SparkContext
sc = SparkContext('local')

所以完整的代码是:

from pyspark import SparkContext
sc = SparkContext('local')

from flask import Flask, request
app = Flask(__name__)

@app.route('/whateverYouWant', methods=['POST'])  #can set first param to '/'

def toyFunction():
    posted_data = sc.parallelize([request.get_data()])
    return str(posted_data.collect()[0])

if __name__ == '__main_':
    app.run(port=8080)    #note set to 8080!

正在编辑设置: 文件 (yourrfilename.py) 必须位于正确的目录中,即必须将其保存到文件夹 /home/ubuntu/spark-1.5.0-bin-hadoop2.6.

然后在目录中发出以下命令:

./bin/spark-submit yourfilename.py

在 10.0.0.XX:8080/accessFunction/ 启动服务。

注意端口必须设置为8080或8081:Spark默认只允许webUI分别为master和worker的这些端口

您可以使用 restful 服务或通过打开新终端并使用 cURL 命令发送 POST 请求来测试该服务:

curl --data "DATA YOU WANT TO POST" http://10.0.0.XX/8080/accessFunction/

我能够通过将 PySpark 和 py4j 的位置添加到我的 flaskapp.wsgi 文件中的路径来解决这个问题。完整内容如下:

import sys
sys.path.insert(0, '/var/www/html/flaskapp')
sys.path.insert(1, '/usr/local/spark-2.0.2-bin-hadoop2.7/python')
sys.path.insert(2, '/usr/local/spark-2.0.2-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip')

from flaskapp import app as application