PySpark:使用 newAPIHadoopFile 从多行记录文本文件中读取、映射和减少
PySpark: read, map and reduce from multiline record textfile with newAPIHadoopFile
我正在尝试解决一个类似于 的问题。我的原始数据是一个文本文件,其中包含多个传感器的值(观测值)。每个观察都带有时间戳,但传感器名称只给出一次,而不是在每一行中给出。但是一个文件中有多个传感器。
Time MHist::852-YF-007
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time MHist::852-YF-008
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
因此,我想将 Hadoop 配置为在提供传感器信息的那些行拆分文件。然后从这些行中读取传感器名称(例如 852-YF-007 和 852-YF-008),并使用 MapReduce 相应地读取每个传感器的值。
我在 Python(Jupyter Notebook)中做了这个:
sheet = sc.newAPIHadoopFile(
'/user/me/sample.txt',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)
sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())
sf.take(50)
输出是这样的:
[[u'::852-YF-007\t',
u'2016-05-10 00:00:00\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0',
u'2016-05-09 23:56:00\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0'],
[u'::852-YF-008\t',
u'2016-05-10 00:00:00\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0',
u'2016-05-09 23:56:00\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0']]
我的问题是,如何进一步处理它以提取传感器名称并获得该传感器的值线。有点喜欢这个
852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines
随后,这些行本身将被拆分为时间戳和值。但我更感兴趣的是从行中拆分传感器名称。
我个人会:
用::
扩展分隔符
sheet = sc.newAPIHadoopFile(
path,
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': 'Time\tMHist::'}
)
放下钥匙:
values = sheet.values()
过滤掉空条目
non_empty = values.filter(lambda x: x)
拆分:
grouped_lines = non_empty.map(str.splitlines)
单独的键和值:
from operator import itemgetter
pairs = grouped_lines.map(itemgetter(0, slice(1, None)))
最后拆分值:
pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs])
当然,所有这些都可以通过一个函数完成:
import dateutil.parser
def process(pair):
_, content = pair
clean = [x.strip() for x in content.strip().splitlines()]
if not clean:
return []
k, vs = clean[0], clean[1:]
for v in vs:
try:
ds, x = v.split("\t")
yield k, (dateutil.parser.parse(ds), float(x)) # or int(x)
except ValueError:
pass
sheet.flatMap(process)
我正在尝试解决一个类似于
Time MHist::852-YF-007
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
Time MHist::852-YF-008
2016-05-10 00:00:00 0
2016-05-09 23:59:00 0
2016-05-09 23:58:00 0
2016-05-09 23:57:00 0
2016-05-09 23:56:00 0
2016-05-09 23:55:00 0
2016-05-09 23:54:00 0
2016-05-09 23:53:00 0
2016-05-09 23:52:00 0
2016-05-09 23:51:00 0
2016-05-09 23:50:00 0
2016-05-09 23:49:00 0
2016-05-09 23:48:00 0
2016-05-09 23:47:00 0
2016-05-09 23:46:00 0
2016-05-09 23:45:00 0
2016-05-09 23:44:00 0
2016-05-09 23:43:00 0
2016-05-09 23:42:00 0
因此,我想将 Hadoop 配置为在提供传感器信息的那些行拆分文件。然后从这些行中读取传感器名称(例如 852-YF-007 和 852-YF-008),并使用 MapReduce 相应地读取每个传感器的值。
我在 Python(Jupyter Notebook)中做了这个:
sheet = sc.newAPIHadoopFile(
'/user/me/sample.txt',
'org.apache.hadoop.mapreduce.lib.input.TextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text',
conf={'textinputformat.record.delimiter': 'Time\tMHist'}
)
sf = sheet.filter(lambda (k, v): v)
sf.map(lambda (k, v): v).splitlines())
sf.take(50)
输出是这样的:
[[u'::852-YF-007\t',
u'2016-05-10 00:00:00\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0',
u'2016-05-09 23:56:00\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0'],
[u'::852-YF-008\t',
u'2016-05-10 00:00:00\t0',
u'2016-05-09 23:59:00\t0',
u'2016-05-09 23:58:00\t0',
u'2016-05-09 23:57:00\t0',
u'2016-05-09 23:56:00\t0',
u'2016-05-09 23:55:00\t0',
u'2016-05-09 23:54:00\t0',
u'2016-05-09 23:53:00\t0',
u'2016-05-09 23:52:00\t0',
u'2016-05-09 23:51:00\t0',
u'2016-05-09 23:50:00\t0',
u'2016-05-09 23:49:00\t0',
u'2016-05-09 23:48:00\t0',
u'2016-05-09 23:47:00\t0',
u'2016-05-09 23:46:00\t0',
u'2016-05-09 23:45:00\t0',
u'2016-05-09 23:44:00\t0',
u'2016-05-09 23:43:00\t0',
u'2016-05-09 23:42:00\t0']]
我的问题是,如何进一步处理它以提取传感器名称并获得该传感器的值线。有点喜欢这个
852-YF-007 --> array of sensor_lines
852-YF-008 --> array of sensor_lines
随后,这些行本身将被拆分为时间戳和值。但我更感兴趣的是从行中拆分传感器名称。
我个人会:
用
扩展分隔符::
sheet = sc.newAPIHadoopFile( path, 'org.apache.hadoop.mapreduce.lib.input.TextInputFormat', 'org.apache.hadoop.io.LongWritable', 'org.apache.hadoop.io.Text', conf={'textinputformat.record.delimiter': 'Time\tMHist::'} )
放下钥匙:
values = sheet.values()
过滤掉空条目
non_empty = values.filter(lambda x: x)
拆分:
grouped_lines = non_empty.map(str.splitlines)
单独的键和值:
from operator import itemgetter pairs = grouped_lines.map(itemgetter(0, slice(1, None)))
最后拆分值:
pairs.flatMapValues(lambda xs: [x.split("\t") for x in xs])
当然,所有这些都可以通过一个函数完成:
import dateutil.parser
def process(pair):
_, content = pair
clean = [x.strip() for x in content.strip().splitlines()]
if not clean:
return []
k, vs = clean[0], clean[1:]
for v in vs:
try:
ds, x = v.split("\t")
yield k, (dateutil.parser.parse(ds), float(x)) # or int(x)
except ValueError:
pass
sheet.flatMap(process)