如何在 PySpark 中读取 Avro 文件
How to read Avro file in PySpark
我正在使用 python 编写 spark 作业。但是,我需要读入一大堆 avro 文件。
This 是我在 Spark 的示例文件夹中找到的最接近的解决方案。但是,您需要使用 spark-submit 提交此 python 脚本。在spark-submit的命令行中,你可以指定driver-class,这样的话,你所有的avrokey,avrovalueclass都会被定位。
avro_rdd = sc.newAPIHadoopFile(
path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
在我的例子中,我需要 运行 Python 脚本中的所有内容,我尝试创建一个环境变量来包含 jar 文件,手指交叉 Python 将添加jar 到路径但显然不是,它给了我意外的 class 错误。
os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"
任何人都可以帮助我如何在一个 python 脚本中读取 avro 文件?
Spark >= 2.4.0
您可以使用 built-in Avro support。 API 向后兼容 spark-avro
包,并添加了一些内容(最值得注意的是 from_avro
/ to_avro
函数)。
请注意,该模块未与标准 Spark 二进制文件捆绑在一起,必须使用 spark.jars.packages
或等效机制包含在内。
另见
Spark < 2.4.0
您可以使用 spark-avro
库。首先让我们创建一个示例数据集:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
schema_string ='''{"namespace": "example.avro",
"type": "record",
"name": "KeyValue",
"fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": ["int", "null"]}
]
}'''
schema = avro.schema.parse(schema_string)
with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
wrt.append({"key": "foo", "value": -1})
wrt.append({"key": "bar", "value": 1})
使用spark-csv
阅读就这么简单:
df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()
## +---+-----+
## |key|value|
## +---+-----+
## |foo| -1|
## |bar| 1|
## +---+-----+
前一种解决方案需要安装第三方 Java 依赖项,这不是大多数 Python 开发人员所满意的。但是,如果您只想使用给定模式解析 Avro 文件,那么您实际上并不需要外部库。你可以只读取二进制文件并用你最喜欢的 python Avro 包解析它们。
例如,这是您可以使用 fastavro
:
加载 Avro 文件的方法
from io import BytesIO
import fastavro
schema = {
...
}
rdd = sc.binaryFiles("/path/to/dataset/*.avro")\
.flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))
print(rdd.collect())
对于 Spark < 2.4.0,PySpark 可以通过使用 JAR "com.databricks.spark.avro" 和 python 的 "subprocess" 模块
解决方法如下:
avsc_location = hdfs://user/test/test.avsc
avro_location = hdfs://user/test/test.avro
#use subprocess module
import subproccess as SP
load_avsc_file = SP.Popen(["hdfs", "dfs", "-cat", avsc_location], stdout=SP.PIPE, stderr=SP.PIPE)
(avsc_file_output, avsc_file_error) = load_avsc_file.communicate()
avro_df = spark.read.format("com.databricks.spark.avro").option("avroSchema", avsc_file_output).load(avro_location)
我正在使用 python 编写 spark 作业。但是,我需要读入一大堆 avro 文件。
This 是我在 Spark 的示例文件夹中找到的最接近的解决方案。但是,您需要使用 spark-submit 提交此 python 脚本。在spark-submit的命令行中,你可以指定driver-class,这样的话,你所有的avrokey,avrovalueclass都会被定位。
avro_rdd = sc.newAPIHadoopFile(
path,
"org.apache.avro.mapreduce.AvroKeyInputFormat",
"org.apache.avro.mapred.AvroKey",
"org.apache.hadoop.io.NullWritable",
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
conf=conf)
在我的例子中,我需要 运行 Python 脚本中的所有内容,我尝试创建一个环境变量来包含 jar 文件,手指交叉 Python 将添加jar 到路径但显然不是,它给了我意外的 class 错误。
os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"
任何人都可以帮助我如何在一个 python 脚本中读取 avro 文件?
Spark >= 2.4.0
您可以使用 built-in Avro support。 API 向后兼容 spark-avro
包,并添加了一些内容(最值得注意的是 from_avro
/ to_avro
函数)。
请注意,该模块未与标准 Spark 二进制文件捆绑在一起,必须使用 spark.jars.packages
或等效机制包含在内。
另见
Spark < 2.4.0
您可以使用 spark-avro
库。首先让我们创建一个示例数据集:
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
schema_string ='''{"namespace": "example.avro",
"type": "record",
"name": "KeyValue",
"fields": [
{"name": "key", "type": "string"},
{"name": "value", "type": ["int", "null"]}
]
}'''
schema = avro.schema.parse(schema_string)
with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
wrt.append({"key": "foo", "value": -1})
wrt.append({"key": "bar", "value": 1})
使用spark-csv
阅读就这么简单:
df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()
## +---+-----+
## |key|value|
## +---+-----+
## |foo| -1|
## |bar| 1|
## +---+-----+
前一种解决方案需要安装第三方 Java 依赖项,这不是大多数 Python 开发人员所满意的。但是,如果您只想使用给定模式解析 Avro 文件,那么您实际上并不需要外部库。你可以只读取二进制文件并用你最喜欢的 python Avro 包解析它们。
例如,这是您可以使用 fastavro
:
from io import BytesIO
import fastavro
schema = {
...
}
rdd = sc.binaryFiles("/path/to/dataset/*.avro")\
.flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))
print(rdd.collect())
对于 Spark < 2.4.0,PySpark 可以通过使用 JAR "com.databricks.spark.avro" 和 python 的 "subprocess" 模块
解决方法如下:
avsc_location = hdfs://user/test/test.avsc
avro_location = hdfs://user/test/test.avro
#use subprocess module
import subproccess as SP
load_avsc_file = SP.Popen(["hdfs", "dfs", "-cat", avsc_location], stdout=SP.PIPE, stderr=SP.PIPE)
(avsc_file_output, avsc_file_error) = load_avsc_file.communicate()
avro_df = spark.read.format("com.databricks.spark.avro").option("avroSchema", avsc_file_output).load(avro_location)