如何将文件传递给主节点?
How to pass files to the master node?
我已经在 python 中编写了代码来实现二进制分类,我想使用 Apache-Spark 在本地计算机中基于不同的数据文件并行化此分类过程。我已经完成了以下步骤:
我编写了包含 4 个 python 文件的整个项目:"run_classifer.py"(用于 运行 我的分类应用程序),"classifer.py" (用于二进制分类),"load_params.py"(用于加载分类学习参数)和"preprocessing.py"(用于预处理数据)。该项目还使用了依赖文件:"tokenizer.perl"(用于预处理部分)和"nonbreaking_prefixes/nonbreaking_prefix.en"(也用于预处理部分)。
我的脚本文件"run_classifer.py"的主要部分定义如下,
### Initialize the Spark
conf = SparkConf().setAppName("ruofan").setMaster("local")
sc = SparkContext(conf = conf,
pyFiles=['''All python files in my project as
well as "nonbreaking_prefix.en" and "tokenizer.perl"'''])
### Read data directory from S3 storage, and create RDD
datafile = sc.wholeTextFiles("s3n://bucket/data_dir")
### Sent the application on each of the slave node
datafile.foreach(lambda (path, content): classifier(path, content))
但是,当我 运行 我的脚本 "run_classifier.py" 时,似乎找不到文件 "nonbreaking_prefix.en"。以下是我得到的错误:
ERROR: No abbreviations files found in /tmp/spark-f035270e-e267-4d71-9bf1-8c42ca2097ee/userFiles-88093e1a-6096-4592-8a71-be5548a4f8ae/nonbreaking_prefixes
但是我居然把文件"nonbreaking_prefix.en"传给了master节点,报错我也不知道。如果有人帮助我解决问题,我将不胜感激。
您可以使用 sc.addFile
上传文件并使用 SparkFiles.get
:
获取工作人员的路径
from pyspark import SparkFiles
sc = (SparkContext(conf = conf,
pyFiles=["All", "Python", "Files", "in", "your", "project"])
# Assuming both files are in your working directory
sc.addFile("nonbreaking_prefix.en")
sc.addFile("tokenizer.perl")
def classifier(path, content):
# Get path for uploaded files
print SparkFiles.get("tokenizer.perl")
with open(SparkFiles.get("nonbreaking_prefix.en")) as fr:
lines = [line for line in fr]
我已经在 python 中编写了代码来实现二进制分类,我想使用 Apache-Spark 在本地计算机中基于不同的数据文件并行化此分类过程。我已经完成了以下步骤:
我编写了包含 4 个 python 文件的整个项目:"run_classifer.py"(用于 运行 我的分类应用程序),"classifer.py" (用于二进制分类),"load_params.py"(用于加载分类学习参数)和"preprocessing.py"(用于预处理数据)。该项目还使用了依赖文件:"tokenizer.perl"(用于预处理部分)和"nonbreaking_prefixes/nonbreaking_prefix.en"(也用于预处理部分)。
我的脚本文件"run_classifer.py"的主要部分定义如下,
### Initialize the Spark conf = SparkConf().setAppName("ruofan").setMaster("local") sc = SparkContext(conf = conf, pyFiles=['''All python files in my project as well as "nonbreaking_prefix.en" and "tokenizer.perl"''']) ### Read data directory from S3 storage, and create RDD datafile = sc.wholeTextFiles("s3n://bucket/data_dir") ### Sent the application on each of the slave node datafile.foreach(lambda (path, content): classifier(path, content))
但是,当我 运行 我的脚本 "run_classifier.py" 时,似乎找不到文件 "nonbreaking_prefix.en"。以下是我得到的错误:
ERROR: No abbreviations files found in /tmp/spark-f035270e-e267-4d71-9bf1-8c42ca2097ee/userFiles-88093e1a-6096-4592-8a71-be5548a4f8ae/nonbreaking_prefixes
但是我居然把文件"nonbreaking_prefix.en"传给了master节点,报错我也不知道。如果有人帮助我解决问题,我将不胜感激。
您可以使用 sc.addFile
上传文件并使用 SparkFiles.get
:
from pyspark import SparkFiles
sc = (SparkContext(conf = conf,
pyFiles=["All", "Python", "Files", "in", "your", "project"])
# Assuming both files are in your working directory
sc.addFile("nonbreaking_prefix.en")
sc.addFile("tokenizer.perl")
def classifier(path, content):
# Get path for uploaded files
print SparkFiles.get("tokenizer.perl")
with open(SparkFiles.get("nonbreaking_prefix.en")) as fr:
lines = [line for line in fr]