获取 CSV 到 Spark 数据框
Get CSV to Spark dataframe
我在 Spark 上使用 python 并希望将 csv 放入数据框中。
Spark SQL 的 documentation 奇怪地没有提供 CSV 作为来源的解释。
我找到了 Spark-CSV,但是我对文档的两个部分有疑问:
"This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3"
每次启动 pyspark 或 spark-submit 时我真的需要添加这个参数吗?显得很不雅观。有没有办法在 python 中导入而不是每次都重新下载?
df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
即使我做了上面的,这也不行。 "source" 参数在这行代码中代表什么?我如何简单地在 linux 上加载本地文件,比如“/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv”?
将csv文件读入RDD,然后从原始RDD生成RowRDD。
创建由与步骤 1 中创建的 RDD 中的行结构匹配的 StructType 表示的模式。
通过SQLContext提供的createDataFrame方法将schema应用于Rows的RDD。
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)
Employee_rdd = sc.textFile("\..\Employee.csv")
.map(lambda line: line.split(","))
Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])
Employee_df.show()
我运行遇到了类似的问题。解决方案是添加一个名为 "PYSPARK_SUBMIT_ARGS" 的环境变量,并将其值设置为“--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell”。这适用于 Spark 的 Python 交互式 shell.
确保您的 spark-csv 版本与安装的 Scala 版本匹配。对于 Scala 2.11,它是 spark-csv_2.11,对于 Scala 2.10 或 2.10.5,它是 spark-csv_2.10.
希望有用。
如果你不介意额外的包依赖,你可以使用Pandas来解析CSV文件。它可以很好地处理内部逗号。
依赖关系:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
一次将整个文件读入 Spark DataFrame:
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2'])
s_df = sql_sc.createDataFrame(pandas_df)
或者,更注重数据,您可以将数据分块到 Spark RDD,然后 DF:
chunk_100k = pd.read_csv('file.csv', chunksize=100000)
for chunky in chunk_100k:
Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
try:
Spark_full_rdd += Spark_temp_rdd
except NameError:
Spark_full_rdd = Spark_temp_rdd
del Spark_temp_rdd
Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])
Spark 2.0之后,推荐使用Spark Session:
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Create a SparkSession
spark = SparkSession \
.builder \
.appName("basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
def mapper(line):
fields = line.split(',')
return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3]))
lines = spark.sparkContext.textFile("file.csv")
df = lines.map(mapper)
# Infer the schema, and register the DataFrame as a table.
schemaDf = spark.createDataFrame(df).cache()
schemaDf.createOrReplaceTempView("tablename")
随着 Spark 的更新版本(我相信是 1.4),这变得容易多了。表达式 sqlContext.read
给你一个 DataFrameReader
实例,使用 .csv()
方法:
df = sqlContext.read.csv("/path/to/your.csv")
请注意,您还可以通过将关键字参数 header=True
添加到 .csv()
调用来指示 csv 文件具有 header。还有一些其他选项可用,并在上面的 link 中进行了描述。
基于 Aravind 的回答,但更短,例如:
lines = sc.textFile("/path/to/file").map(lambda x: x.split(","))
df = lines.toDF(["year", "month", "day", "count"])
对于Pyspark,假设csv文件的第一行包含一个header
spark = SparkSession.builder.appName('chosenName').getOrCreate()
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)
使用当前实现 (spark 2.X) 你不需要添加 packages 参数,你可以使用内置的 csv 实现
此外,作为已接受的答案,您不需要创建一个 rdd 然后强制执行具有 1 个潜在问题的模式
当您读取 csv 时,它会将所有字段标记为字符串,并且当您使用整数列强制执行架构时,您将遇到异常。
执行上述操作的更好方法是
spark.read.format("csv").schema(schema).option("header", "true").load(input_path).show()
我在 Spark 上使用 python 并希望将 csv 放入数据框中。
Spark SQL 的 documentation 奇怪地没有提供 CSV 作为来源的解释。
我找到了 Spark-CSV,但是我对文档的两个部分有疑问:
"This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3"
每次启动 pyspark 或 spark-submit 时我真的需要添加这个参数吗?显得很不雅观。有没有办法在 python 中导入而不是每次都重新下载?df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")
即使我做了上面的,这也不行。 "source" 参数在这行代码中代表什么?我如何简单地在 linux 上加载本地文件,比如“/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv”?
将csv文件读入RDD,然后从原始RDD生成RowRDD。
创建由与步骤 1 中创建的 RDD 中的行结构匹配的 StructType 表示的模式。
通过SQLContext提供的createDataFrame方法将schema应用于Rows的RDD。
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))
# The schema is encoded in a string.
schemaString = "name age"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)
Employee_rdd = sc.textFile("\..\Employee.csv")
.map(lambda line: line.split(","))
Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])
Employee_df.show()
我运行遇到了类似的问题。解决方案是添加一个名为 "PYSPARK_SUBMIT_ARGS" 的环境变量,并将其值设置为“--packages com.databricks:spark-csv_2.10:1.4.0 pyspark-shell”。这适用于 Spark 的 Python 交互式 shell.
确保您的 spark-csv 版本与安装的 Scala 版本匹配。对于 Scala 2.11,它是 spark-csv_2.11,对于 Scala 2.10 或 2.10.5,它是 spark-csv_2.10.
希望有用。
如果你不介意额外的包依赖,你可以使用Pandas来解析CSV文件。它可以很好地处理内部逗号。
依赖关系:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
一次将整个文件读入 Spark DataFrame:
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2'])
s_df = sql_sc.createDataFrame(pandas_df)
或者,更注重数据,您可以将数据分块到 Spark RDD,然后 DF:
chunk_100k = pd.read_csv('file.csv', chunksize=100000)
for chunky in chunk_100k:
Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
try:
Spark_full_rdd += Spark_temp_rdd
except NameError:
Spark_full_rdd = Spark_temp_rdd
del Spark_temp_rdd
Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])
Spark 2.0之后,推荐使用Spark Session:
from pyspark.sql import SparkSession
from pyspark.sql import Row
# Create a SparkSession
spark = SparkSession \
.builder \
.appName("basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
def mapper(line):
fields = line.split(',')
return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3]))
lines = spark.sparkContext.textFile("file.csv")
df = lines.map(mapper)
# Infer the schema, and register the DataFrame as a table.
schemaDf = spark.createDataFrame(df).cache()
schemaDf.createOrReplaceTempView("tablename")
随着 Spark 的更新版本(我相信是 1.4),这变得容易多了。表达式 sqlContext.read
给你一个 DataFrameReader
实例,使用 .csv()
方法:
df = sqlContext.read.csv("/path/to/your.csv")
请注意,您还可以通过将关键字参数 header=True
添加到 .csv()
调用来指示 csv 文件具有 header。还有一些其他选项可用,并在上面的 link 中进行了描述。
基于 Aravind 的回答,但更短,例如:
lines = sc.textFile("/path/to/file").map(lambda x: x.split(","))
df = lines.toDF(["year", "month", "day", "count"])
对于Pyspark,假设csv文件的第一行包含一个header
spark = SparkSession.builder.appName('chosenName').getOrCreate()
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)
使用当前实现 (spark 2.X) 你不需要添加 packages 参数,你可以使用内置的 csv 实现
此外,作为已接受的答案,您不需要创建一个 rdd 然后强制执行具有 1 个潜在问题的模式
当您读取 csv 时,它会将所有字段标记为字符串,并且当您使用整数列强制执行架构时,您将遇到异常。
执行上述操作的更好方法是
spark.read.format("csv").schema(schema).option("header", "true").load(input_path).show()