Spark:如何将字节字符串写入 pyspark 中的 hdfs hadoop 以进行 spark-xml 转换?
Spark: How to write bytes string to hdfs hadoop in pyspark for spark-xml transformation?
在python中,字节字符串可以简单地保存到单个xml文件中:
with open('/home/user/file.xml' ,'wb') as f:
f.write(b'<Value>1</Value>')
当前输出:/home/user/file.xml(文件保存在本地文件中)
问题:如何将字符串保存到 pyspark 中 hdfs 上的 xml 文件:
预期输出:'hdfs://hostname:9000/file.xml'
背景:大量 xml 文件由第 3 方 Web API 提供。我在 pyspark 中构建 ETL 管道到 delta lake。数据由aiohttp异步提取,接下来我想在将spark数据帧保存到delta lake之前使用spark-xml进行转换(需要pyspark)。我正在寻找构建管道的最有效方法。
在 github 上向 spark-xml 开发人员提出了类似的问题。
https://github.com/databricks/spark-xml/issues/515
最新研究:
spark-xml 用作输入 xml 文件直接存储为磁盘或 spark 数据帧上的文本
所以我只能使用以下两个选项之一:
a) 一些 hdfs 客户端(pyarrow、hdfs、aiohdfs)将文件保存到 hdfs(hdfs 上的文本文件不是很有效的格式)
b) 将数据加载到 spark 数据帧以进行 spark-xml 转换(delta lake 的本机格式)
如果您有其他想法,请告诉我。
不要被 databricks spark-xml 文档误导,因为它们会导致使用未压缩的 XML 文件作为输入。这是非常低效的,直接将 XMLs 下载到 spark 数据帧要快得多。 Databricks xml-pyspark 版本不包含它,但有一个 workaround:
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
def ext_from_xml(xml_column, schema, options={}):
java_column = _to_java_column(xml_column.cast('string'))
java_schema = spark._jsparkSession.parseDataType(schema.json())
scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
java_column, java_schema, scala_map)
return Column(jc)
def ext_schema_of_xml_df(df, options={}):
assert len(df.columns) == 1
scala_options = spark._jvm.PythonUtils.toScalaMap(options)
java_xml_module = getattr(getattr(
spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
return _parse_datatype_json_string(java_schema.json())
XML 秒下载到列表
xml = [('url',"""<Level_0 Id0="Id0_value_file1">
<Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
<Level_2_A>A</Level_2_A>
<Level_2>
<Level_3>
<Level_4>
<Date>2021-01-01</Date>
<Value>4_1</Value>
</Level_4>
<Level_4>
<Date>2021-01-02</Date>
<Value>4_2</Value>
</Level_4>
</Level_3>
</Level_2>
</Level_1>
</Level_0>"""),
('url',"""<Level_0 I"d0="Id0_value_file2">
<Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
<Level_2_A>A</Level_2_A>
<Level_2>
<Level_3>
<Level_4>
<Date>2021-01-01</Date>
<Value>4_1</Value>
</Level_4>
<Level_4>
<Date>2021-01-02</Date>
<Value>4_2</Value>
</Level_4>
</Level_3>
</Level_2>
</Level_1>
</Level_0>""")]
XML 字符串的 Spark 数据帧转换:
#create df with XML strings
rdd = sc.parallelize(xml)
df = spark.createDataFrame(rdd,"url string, content string")
# XML schema
payloadSchema = ext_schema_of_xml_df(df.select("content"))
# parse xml
parsed = df.withColumn("parsed", ext_from_xml(df.content, payloadSchema, {"rowTag":"Level_0"}))
# select required data
df2 = parsed.select(
'parsed._Id0',
F.explode_outer('parsed.Level_1.Level_2.Level_3.Level_4').alias('Level_4')
).select(
'`parsed._Id0`',
'Level_4.*'
)
解码字节:b'string'.decode('utf-8')
@mck 回答有关 XMLs 的更多信息:
在python中,字节字符串可以简单地保存到单个xml文件中:
with open('/home/user/file.xml' ,'wb') as f:
f.write(b'<Value>1</Value>')
当前输出:/home/user/file.xml(文件保存在本地文件中)
问题:如何将字符串保存到 pyspark 中 hdfs 上的 xml 文件:
预期输出:'hdfs://hostname:9000/file.xml'
背景:大量 xml 文件由第 3 方 Web API 提供。我在 pyspark 中构建 ETL 管道到 delta lake。数据由aiohttp异步提取,接下来我想在将spark数据帧保存到delta lake之前使用spark-xml进行转换(需要pyspark)。我正在寻找构建管道的最有效方法。
在 github 上向 spark-xml 开发人员提出了类似的问题。 https://github.com/databricks/spark-xml/issues/515
最新研究:
spark-xml 用作输入 xml 文件直接存储为磁盘或 spark 数据帧上的文本
所以我只能使用以下两个选项之一:
a) 一些 hdfs 客户端(pyarrow、hdfs、aiohdfs)将文件保存到 hdfs(hdfs 上的文本文件不是很有效的格式)
b) 将数据加载到 spark 数据帧以进行 spark-xml 转换(delta lake 的本机格式)
如果您有其他想法,请告诉我。
不要被 databricks spark-xml 文档误导,因为它们会导致使用未压缩的 XML 文件作为输入。这是非常低效的,直接将 XMLs 下载到 spark 数据帧要快得多。 Databricks xml-pyspark 版本不包含它,但有一个 workaround:
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
def ext_from_xml(xml_column, schema, options={}):
java_column = _to_java_column(xml_column.cast('string'))
java_schema = spark._jsparkSession.parseDataType(schema.json())
scala_map = spark._jvm.org.apache.spark.api.python.PythonUtils.toScalaMap(options)
jc = spark._jvm.com.databricks.spark.xml.functions.from_xml(
java_column, java_schema, scala_map)
return Column(jc)
def ext_schema_of_xml_df(df, options={}):
assert len(df.columns) == 1
scala_options = spark._jvm.PythonUtils.toScalaMap(options)
java_xml_module = getattr(getattr(
spark._jvm.com.databricks.spark.xml, "package$"), "MODULE$")
java_schema = java_xml_module.schema_of_xml_df(df._jdf, scala_options)
return _parse_datatype_json_string(java_schema.json())
XML 秒下载到列表
xml = [('url',"""<Level_0 Id0="Id0_value_file1">
<Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
<Level_2_A>A</Level_2_A>
<Level_2>
<Level_3>
<Level_4>
<Date>2021-01-01</Date>
<Value>4_1</Value>
</Level_4>
<Level_4>
<Date>2021-01-02</Date>
<Value>4_2</Value>
</Level_4>
</Level_3>
</Level_2>
</Level_1>
</Level_0>"""),
('url',"""<Level_0 I"d0="Id0_value_file2">
<Level_1 Id1_1 ="Id3_value" Id_2="Id2_value">
<Level_2_A>A</Level_2_A>
<Level_2>
<Level_3>
<Level_4>
<Date>2021-01-01</Date>
<Value>4_1</Value>
</Level_4>
<Level_4>
<Date>2021-01-02</Date>
<Value>4_2</Value>
</Level_4>
</Level_3>
</Level_2>
</Level_1>
</Level_0>""")]
XML 字符串的 Spark 数据帧转换:
#create df with XML strings
rdd = sc.parallelize(xml)
df = spark.createDataFrame(rdd,"url string, content string")
# XML schema
payloadSchema = ext_schema_of_xml_df(df.select("content"))
# parse xml
parsed = df.withColumn("parsed", ext_from_xml(df.content, payloadSchema, {"rowTag":"Level_0"}))
# select required data
df2 = parsed.select(
'parsed._Id0',
F.explode_outer('parsed.Level_1.Level_2.Level_3.Level_4').alias('Level_4')
).select(
'`parsed._Id0`',
'Level_4.*'
)
解码字节:b'string'.decode('utf-8')
@mck 回答有关 XMLs 的更多信息: