如何在pyspark脚本中访问SparkContext
How to access SparkContext in pyspark script
以下 SOF 问题 How to run script in Pyspark and drop into IPython shell when done? 说明如何启动 pyspark 脚本:
%run -d myscript.py
但是我们如何访问现有的 spark 上下文?
仅仅创建一个新的是行不通的:
----> sc = SparkContext("local", 1)
ValueError: Cannot run multiple SparkContexts at once; existing
SparkContext(app=PySparkShell, master=local) created by <module> at
/Library/Python/2.7/site-packages/IPython/utils/py3compat.py:204
但是尝试使用现有的..好吧什么现有的?
In [50]: for s in filter(lambda x: 'SparkContext' in repr(x[1]) and len(repr(x[1])) < 150, locals().iteritems()):
print s
('SparkContext', <class 'pyspark.context.SparkContext'>)
即SparkContext 实例没有变量
当您在终端输入 pyspark 时,python 会自动创建 spark 上下文 sc。
wordcount 的独立 python 脚本:使用 contextmanager
编写可重用的 spark 上下文
"""SimpleApp.py"""
from contextlib import contextmanager
from pyspark import SparkContext
from pyspark import SparkConf
SPARK_MASTER='local'
SPARK_APP_NAME='Word Count'
SPARK_EXECUTOR_MEMORY='200m'
@contextmanager
def spark_manager():
conf = SparkConf().setMaster(SPARK_MASTER) \
.setAppName(SPARK_APP_NAME) \
.set("spark.executor.memory", SPARK_EXECUTOR_MEMORY)
spark_context = SparkContext(conf=conf)
try:
yield spark_context
finally:
spark_context.stop()
with spark_manager() as context:
File = "/home/ramisetty/sparkex/README.md" # Should be some file on your system
textFileRDD = context.textFile(File)
wordCounts = textFileRDD.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
wordCounts.saveAsTextFile("output")
print "WordCount - Done"
启动:
/bin/spark-submit SimpleApp.py
包括以下内容:
from pyspark.context import SparkContext
然后在 SparkContext
上调用静态方法为:
sc = SparkContext.getOrCreate()
如果您已经创建了一个 SparkSession:
spark = SparkSession \
.builder \
.appName("StreamKafka_Test") \
.getOrCreate()
然后您可以像这样访问“现有的”SparkContext:
sc = spark.sparkContext
以下 SOF 问题 How to run script in Pyspark and drop into IPython shell when done? 说明如何启动 pyspark 脚本:
%run -d myscript.py
但是我们如何访问现有的 spark 上下文?
仅仅创建一个新的是行不通的:
----> sc = SparkContext("local", 1)
ValueError: Cannot run multiple SparkContexts at once; existing
SparkContext(app=PySparkShell, master=local) created by <module> at
/Library/Python/2.7/site-packages/IPython/utils/py3compat.py:204
但是尝试使用现有的..好吧什么现有的?
In [50]: for s in filter(lambda x: 'SparkContext' in repr(x[1]) and len(repr(x[1])) < 150, locals().iteritems()):
print s
('SparkContext', <class 'pyspark.context.SparkContext'>)
即SparkContext 实例没有变量
当您在终端输入 pyspark 时,python 会自动创建 spark 上下文 sc。
wordcount 的独立 python 脚本:使用 contextmanager
编写可重用的 spark 上下文"""SimpleApp.py"""
from contextlib import contextmanager
from pyspark import SparkContext
from pyspark import SparkConf
SPARK_MASTER='local'
SPARK_APP_NAME='Word Count'
SPARK_EXECUTOR_MEMORY='200m'
@contextmanager
def spark_manager():
conf = SparkConf().setMaster(SPARK_MASTER) \
.setAppName(SPARK_APP_NAME) \
.set("spark.executor.memory", SPARK_EXECUTOR_MEMORY)
spark_context = SparkContext(conf=conf)
try:
yield spark_context
finally:
spark_context.stop()
with spark_manager() as context:
File = "/home/ramisetty/sparkex/README.md" # Should be some file on your system
textFileRDD = context.textFile(File)
wordCounts = textFileRDD.flatMap(lambda line: line.split()).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
wordCounts.saveAsTextFile("output")
print "WordCount - Done"
启动:
/bin/spark-submit SimpleApp.py
包括以下内容:
from pyspark.context import SparkContext
然后在 SparkContext
上调用静态方法为:
sc = SparkContext.getOrCreate()
如果您已经创建了一个 SparkSession:
spark = SparkSession \
.builder \
.appName("StreamKafka_Test") \
.getOrCreate()
然后您可以像这样访问“现有的”SparkContext:
sc = spark.sparkContext