如何 match/extract 来自 pyspark 文件的多行模式
How to match/extract multi-line pattern from file in pysark
我有一个巨大的 rdf 三元组(主题谓词对象)文件,如下图所示。它提取粗体项目的目标并具有以下输出
Item_Id | quantityAmount | quantityUnit | rank
-----------------------------------------------
Q31 24954 Meter BestRank
Q25 582 Kilometer NormalRank
我想提取遵循以下模式的行
主题被赋予一个指针(<Q31> <prop/P1082> <Pointer_Q31-87RF> .
)
指针有排名 (<Pointer_Q31-87RF> <rank> <BestRank>
)
和 valuePointer (<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9>
)
valuePointer依次指向它的Amount(<value/cebcf9> <quantityAmount> "24954"
)和Unit(<value/cebcf9> <quantityUnit> <Meter>
)
正常的方法是逐行读取文件并提取上述模式中的每一个(使用 sc.textFile('inFile').flatMap(lambda x: extractFunc(x)) 和然后通过不同的连接将它们组合起来,这样就可以提供上面的 table。
有没有更好的方法来解决这个问题?我在下面包含了文件示例。
<Q31> <prop/P1082> <Pointer_Q31-87RF> .
<Pointer_Q31-87RF> <rank> <BestRank> .
<Pointer_Q31-87RF> <prop/Pointer_P1082> "+24954"^^<2001/XMLSchema#decimal> .
<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> 24954
<value/cebcf9> <quantityUnit> <Meter> .
<Q25> <prop/P1082> <Pointer_Q25-8E6C> .
<Pointer_Q25-8E6C> <rank> <NormalRank> .
<Pointer_Q25-8E6C> <prop/Pointer_P1082> "+24954”
<Pointer_Q25-8E6C> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> "582" .
<value/cebcf9> <quantityUnit> <Kilometer> .
如果可以使用\n<Q
作为分隔符来创建RDD元素,那么解析数据块就变成了一个纯粹的python任务。下面我创建了一个函数(基于您的示例)来使用正则表达式解析块文本并将 cols 信息检索到 Row 对象中(您可能必须调整正则表达式以反映实际数据模式,即区分大小写、额外的空格等) :
- 对于每个 RDD 元素,按'\n'(行模式)分割
- 然后对于每一行,按
> <
分成一个列表 y
- 我们可以通过检查 y[1] 和 y[2] 找到
rank
, quantityUnit
, quantityAmount
通过检查 y[1] 和 Item_id
通过检查 y[0].
通过迭代所有必填字段创建 Row 对象,将缺少字段的值设置为 None
from pyspark.sql import Row
import re
# skipped the code to initialize SparkSession
# field names to retrieve
cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank']
def parse_rdd_element(x, cols):
try:
row = {}
for e in x.split('\n'):
y = e.split('> <')
if len(y) < 2:
continue
if y[1] in ['rank', 'quantityUnit']:
row[y[1]] = y[2].split(">")[0]
else:
m = re.match(r'^quantityAmount>\D*(\d+)', y[1])
if m:
row['quantityAmount'] = m.group(1)
continue
m = re.match('^(?:<Q)?(\d+)', y[0])
if m:
row['Item_Id'] = 'Q' + m.group(1)
# if row is not EMPTY, set None to missing field
return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None
except:
return None
使用 newAPIHadoopFile() 以 \n<Q
作为分隔符设置 RDD:
rdd = spark.sparkContext.newAPIHadoopFile(
'/path/to/file',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\n<Q'}
)
使用map函数将RDD元素解析为Row对象
rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]
将上面的RDD转换成dataframe
df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit| rank|
+-------+--------------+------------+----------+
| Q31| 24954| Meter| BestRank|
| Q25| 582| Kilometer|NormalRank|
+-------+--------------+------------+----------+
一些注意事项:
为了获得更好的性能,在将它们传递给 parse_rdd_element() 函数之前,使用 re.compile()
预编译所有正则表达式模式。
如果\n
和<Q
之间可能有spaces/tabs,多个块将被添加到同一个RDD元素中,只需将RDD元素拆分为\n\s+<Q
并将 map()
替换为 flatMap()
.
参考:
我有一个巨大的 rdf 三元组(主题谓词对象)文件,如下图所示。它提取粗体项目的目标并具有以下输出
Item_Id | quantityAmount | quantityUnit | rank
-----------------------------------------------
Q31 24954 Meter BestRank
Q25 582 Kilometer NormalRank
我想提取遵循以下模式的行
主题被赋予一个指针(
<Q31> <prop/P1082> <Pointer_Q31-87RF> .
)指针有排名 (
<Pointer_Q31-87RF> <rank> <BestRank>
)
和 valuePointer (<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9>
)valuePointer依次指向它的Amount(
<value/cebcf9> <quantityAmount> "24954"
)和Unit(<value/cebcf9> <quantityUnit> <Meter>
)
正常的方法是逐行读取文件并提取上述模式中的每一个(使用 sc.textFile('inFile').flatMap(lambda x: extractFunc(x)) 和然后通过不同的连接将它们组合起来,这样就可以提供上面的 table。 有没有更好的方法来解决这个问题?我在下面包含了文件示例。
<Q31> <prop/P1082> <Pointer_Q31-87RF> .
<Pointer_Q31-87RF> <rank> <BestRank> .
<Pointer_Q31-87RF> <prop/Pointer_P1082> "+24954"^^<2001/XMLSchema#decimal> .
<Pointer_Q31-87RF> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> 24954
<value/cebcf9> <quantityUnit> <Meter> .
<Q25> <prop/P1082> <Pointer_Q25-8E6C> .
<Pointer_Q25-8E6C> <rank> <NormalRank> .
<Pointer_Q25-8E6C> <prop/Pointer_P1082> "+24954”
<Pointer_Q25-8E6C> <prop/Pointer_value/P1082> <value/cebcf9> .
<value/cebcf9> <syntax-ns#type> <QuantityValue> .
<value/cebcf9> <quantityAmount> "582" .
<value/cebcf9> <quantityUnit> <Kilometer> .
如果可以使用\n<Q
作为分隔符来创建RDD元素,那么解析数据块就变成了一个纯粹的python任务。下面我创建了一个函数(基于您的示例)来使用正则表达式解析块文本并将 cols 信息检索到 Row 对象中(您可能必须调整正则表达式以反映实际数据模式,即区分大小写、额外的空格等) :
- 对于每个 RDD 元素,按'\n'(行模式)分割
- 然后对于每一行,按
> <
分成一个列表y
- 我们可以通过检查 y[1] 和 y[2] 找到
rank
,quantityUnit
,quantityAmount
通过检查 y[1] 和Item_id
通过检查 y[0]. 通过迭代所有必填字段创建 Row 对象,将缺少字段的值设置为 None
from pyspark.sql import Row import re # skipped the code to initialize SparkSession # field names to retrieve cols = ['Item_Id', 'quantityAmount', 'quantityUnit', 'rank'] def parse_rdd_element(x, cols): try: row = {} for e in x.split('\n'): y = e.split('> <') if len(y) < 2: continue if y[1] in ['rank', 'quantityUnit']: row[y[1]] = y[2].split(">")[0] else: m = re.match(r'^quantityAmount>\D*(\d+)', y[1]) if m: row['quantityAmount'] = m.group(1) continue m = re.match('^(?:<Q)?(\d+)', y[0]) if m: row['Item_Id'] = 'Q' + m.group(1) # if row is not EMPTY, set None to missing field return Row(**dict([ (k, row[k]) if k in row else (k, None) for k in cols])) if row else None except: return None
使用 newAPIHadoopFile() 以 \n<Q
作为分隔符设置 RDD:
rdd = spark.sparkContext.newAPIHadoopFile(
'/path/to/file',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\n<Q'}
)
使用map函数将RDD元素解析为Row对象
rdd.map(lambda x: parse_rdd_element(x[1], cols)).collect()
#[Row(Item_Id=u'Q31', quantityAmount=u'24954', quantityUnit=u'Meter', rank=u'BestRank'),
# Row(Item_Id=u'Q25', quantityAmount=u'582', quantityUnit=u'Kilometer', rank=u'NormalRank')]
将上面的RDD转换成dataframe
df = rdd.map(lambda x: parse_rdd_element(x[1], cols)).filter(bool).toDF()
df.show()
+-------+--------------+------------+----------+
|Item_Id|quantityAmount|quantityUnit| rank|
+-------+--------------+------------+----------+
| Q31| 24954| Meter| BestRank|
| Q25| 582| Kilometer|NormalRank|
+-------+--------------+------------+----------+
一些注意事项:
为了获得更好的性能,在将它们传递给 parse_rdd_element() 函数之前,使用
re.compile()
预编译所有正则表达式模式。如果
\n
和<Q
之间可能有spaces/tabs,多个块将被添加到同一个RDD元素中,只需将RDD元素拆分为\n\s+<Q
并将map()
替换为flatMap()
.
参考: