从多行记录创建火花数据结构
creating spark data structure from multiline record
我正在尝试将 retrosheet 事件文件读入 spark。事件文件的结构是这样的。
id,TEX201403310
version,2
info,visteam,PHI
info,hometeam,TEX
info,site,ARL02
info,date,2014/03/31
info,number,0
info,starttime,1:07PM
info,daynight,day
info,usedh,true
info,umphome,joycj901
info,attendance,49031
start,reveb001,"Ben Revere",0,1,8
start,rollj001,"Jimmy Rollins",0,2,6
start,utlec001,"Chase Utley",0,3,4
start,howar001,"Ryan Howard",0,4,3
start,byrdm001,"Marlon Byrd",0,5,9
id,TEX201404010
version,2
info,visteam,PHI
info,hometeam,TEX
正如您所见,对于每场比赛,事件都会循环往复。
我已将文件读入 RDD,然后通过第二个 for 循环为每次迭代添加一个键,这似乎有效。但我希望得到一些反馈,看看是否有一种清洁方法可以使用 spark 方法来做到这一点。
logFile = '2014TEX.EVA'
event_data = (sc
.textFile(logfile)
.collect())
idKey = 0
newevent_list = []
for line in event_dataFile:
if line.startswith('id'):
idKey += 1
newevent_list.append((idKey,line))
else:
newevent_list.append((idKey,line))
event_data = sc.parallelize(newevent_list)
PySpark since version 1.1 supports Hadoop Input Formats。您可以使用 textinputformat.record.delimiter
选项来使用自定义格式分隔符,如下所示
from operator import itemgetter
retrosheet = sc.newAPIHadoopFile(
'/path/to/retrosheet/file',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\nid,'}
)
(retrosheet
.filter(itemgetter(1))
.values()
.filter(lambda x: x)
.map(lambda v: (
v if v.startswith('id') else 'id,{0}'.format(v)).splitlines()))
从 Spark 2.4 开始,您还可以使用 text
reader
将数据读入 DataFrame
spark.read.option("lineSep", '\nid,').text('/path/to/retrosheet/file')
我正在尝试将 retrosheet 事件文件读入 spark。事件文件的结构是这样的。
id,TEX201403310
version,2
info,visteam,PHI
info,hometeam,TEX
info,site,ARL02
info,date,2014/03/31
info,number,0
info,starttime,1:07PM
info,daynight,day
info,usedh,true
info,umphome,joycj901
info,attendance,49031
start,reveb001,"Ben Revere",0,1,8
start,rollj001,"Jimmy Rollins",0,2,6
start,utlec001,"Chase Utley",0,3,4
start,howar001,"Ryan Howard",0,4,3
start,byrdm001,"Marlon Byrd",0,5,9
id,TEX201404010
version,2
info,visteam,PHI
info,hometeam,TEX
正如您所见,对于每场比赛,事件都会循环往复。
我已将文件读入 RDD,然后通过第二个 for 循环为每次迭代添加一个键,这似乎有效。但我希望得到一些反馈,看看是否有一种清洁方法可以使用 spark 方法来做到这一点。
logFile = '2014TEX.EVA'
event_data = (sc
.textFile(logfile)
.collect())
idKey = 0
newevent_list = []
for line in event_dataFile:
if line.startswith('id'):
idKey += 1
newevent_list.append((idKey,line))
else:
newevent_list.append((idKey,line))
event_data = sc.parallelize(newevent_list)
PySpark since version 1.1 supports Hadoop Input Formats。您可以使用 textinputformat.record.delimiter
选项来使用自定义格式分隔符,如下所示
from operator import itemgetter
retrosheet = sc.newAPIHadoopFile(
'/path/to/retrosheet/file',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': '\nid,'}
)
(retrosheet
.filter(itemgetter(1))
.values()
.filter(lambda x: x)
.map(lambda v: (
v if v.startswith('id') else 'id,{0}'.format(v)).splitlines()))
从 Spark 2.4 开始,您还可以使用 text
reader
DataFrame
spark.read.option("lineSep", '\nid,').text('/path/to/retrosheet/file')