数据流 - XML 来源 - Python - 如何?
Dataflow - XML Source - Python - How?
我正在尝试将 XML 文件作为我的数据流代码的来源。我看到 java 有内置的 XMLIo 但 Python 没有?我自己也在努力了解 ParDo/DoFn 它的初始步骤是什么。这是下面 XML 文件的示例。解析 .csv 时我的管道在下面,我理解,但我不理解如何从 XML 源开始。我是否需要手动创建 PCollection 并从那里开始?
我的目标是return将每个元素作为一个元组。键将是国家名称,后面的每个元素(在嵌套数组中)将是值。
<?xml version="1.0"?>
<data>
<country name="Liechtenstein">
<rank>1</rank>
<year>2008</year>
<gdppc>141100</gdppc>
<neighbor name="Austria" direction="E"/>
<neighbor name="Switzerland" direction="W"/>
</country>
<country name="Singapore">
<rank>4</rank>
<year>2011</year>
<gdppc>59900</gdppc>
<neighbor name="Malaysia" direction="N"/>
</country>
<country name="Panama">
<rank>68</rank>
<year>2011</year>
<gdppc>13600</gdppc>
<neighbor name="Costa Rica" direction="W"/>
<neighbor name="Colombia" direction="E"/>
</country>
</data>
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/'.format(BUCKET),
'--temp_location=gs://{0}/'.format(BUCKET),
'--runner=DataflowRunner'
#'--runner=DirectRunner'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/example.csv'.format(BUCKET))
-[SNIP]-
下面的代码将收集每个国家的信息。
输出是一个元组列表。
元组中的第一个元素是国家名称,第二个元素是其他国家属性的列表。
import xml.etree.ElementTree as ET
xml = '''<?xml version="1.0"?>
<data>
<country name="Liechtenstein">
<rank>1</rank>
<year>2008</year>
<gdppc>141100</gdppc>
<neighbor name="Austria" direction="E"/>
<neighbor name="Switzerland" direction="W"/>
</country>
<country name="Singapore">
<rank>4</rank>
<year>2011</year>
<gdppc>59900</gdppc>
<neighbor name="Malaysia" direction="N"/>
</country>
<country name="Panama">
<rank>68</rank>
<year>2011</year>
<gdppc>13600</gdppc>
<neighbor name="Costa Rica" direction="W"/>
<neighbor name="Colombia" direction="E"/>
</country>
</data>'''
result = []
root = ET.fromstring(xml)
for country in root.findall('.//country'):
result.append((country.attrib['name'],[x.text if x.text else x.attrib for x in list(country)]))
print(result)
输出
[('Liechtenstein', ['1', '2008', '141100', {'name': 'Austria', 'direction': 'E'}, {'name': 'Switzerland','direction': 'W'}]), ('Singapore', ['4', '2011', '59900', {'name': 'Malaysia', 'direction': 'N'}]), ('Panama', ['68', '2011', '13600', {'name': 'Costa Rica', 'direction': 'W'}, {'name': 'Colombia', 'direction': 'E'}])]
我正在尝试将 XML 文件作为我的数据流代码的来源。我看到 java 有内置的 XMLIo 但 Python 没有?我自己也在努力了解 ParDo/DoFn 它的初始步骤是什么。这是下面 XML 文件的示例。解析 .csv 时我的管道在下面,我理解,但我不理解如何从 XML 源开始。我是否需要手动创建 PCollection 并从那里开始?
我的目标是return将每个元素作为一个元组。键将是国家名称,后面的每个元素(在嵌套数组中)将是值。
<?xml version="1.0"?>
<data>
<country name="Liechtenstein">
<rank>1</rank>
<year>2008</year>
<gdppc>141100</gdppc>
<neighbor name="Austria" direction="E"/>
<neighbor name="Switzerland" direction="W"/>
</country>
<country name="Singapore">
<rank>4</rank>
<year>2011</year>
<gdppc>59900</gdppc>
<neighbor name="Malaysia" direction="N"/>
</country>
<country name="Panama">
<rank>68</rank>
<year>2011</year>
<gdppc>13600</gdppc>
<neighbor name="Costa Rica" direction="W"/>
<neighbor name="Colombia" direction="E"/>
</country>
</data>
def run():
argv = [
'--project={0}'.format(PROJECT),
'--staging_location=gs://{0}/'.format(BUCKET),
'--temp_location=gs://{0}/'.format(BUCKET),
'--runner=DataflowRunner'
#'--runner=DirectRunner'
]
p = beam.Pipeline(argv=argv)
(p
| 'ReadFromGCS' >> beam.io.textio.ReadFromText('gs://{0}/example.csv'.format(BUCKET))
-[SNIP]-
下面的代码将收集每个国家的信息。
输出是一个元组列表。
元组中的第一个元素是国家名称,第二个元素是其他国家属性的列表。
import xml.etree.ElementTree as ET
xml = '''<?xml version="1.0"?>
<data>
<country name="Liechtenstein">
<rank>1</rank>
<year>2008</year>
<gdppc>141100</gdppc>
<neighbor name="Austria" direction="E"/>
<neighbor name="Switzerland" direction="W"/>
</country>
<country name="Singapore">
<rank>4</rank>
<year>2011</year>
<gdppc>59900</gdppc>
<neighbor name="Malaysia" direction="N"/>
</country>
<country name="Panama">
<rank>68</rank>
<year>2011</year>
<gdppc>13600</gdppc>
<neighbor name="Costa Rica" direction="W"/>
<neighbor name="Colombia" direction="E"/>
</country>
</data>'''
result = []
root = ET.fromstring(xml)
for country in root.findall('.//country'):
result.append((country.attrib['name'],[x.text if x.text else x.attrib for x in list(country)]))
print(result)
输出
[('Liechtenstein', ['1', '2008', '141100', {'name': 'Austria', 'direction': 'E'}, {'name': 'Switzerland','direction': 'W'}]), ('Singapore', ['4', '2011', '59900', {'name': 'Malaysia', 'direction': 'N'}]), ('Panama', ['68', '2011', '13600', {'name': 'Costa Rica', 'direction': 'W'}, {'name': 'Colombia', 'direction': 'E'}])]