使用 Apache Beam 的 ReadFromText 解析包含“\n”的行
Parsing lines containing "\n" in with Apache Beam's `ReadFromText`
我有一个 .csv
文件,我试图在管道中使用 apache_beam.io.ReadFromText()
读取它(beam
是 apache_beam
的别名):
reader = beam.io.ReadFromText(csv_path, skip_header_lines=1)
我有一行 reader class 如下所示:
class RowReader(beam.DoFn):
def process(self, row):
row_delimited = row.split(",")
text_1 = str(row_delimited[1]).encode()
text_2 = str(row_delimited[4]).encode()
image_1 = tf.io.read_file(row_delimited[-3]).numpy()
image_2 = tf.io.read_file(row_delimited[-2]).numpy()
label = row_delimited[-1]
yield {"text_1": text_1, "image_1": image_1,
"text_2": text_2, "image_2": image_2,
"label": label}
csv 文件中的其中一行有如下条目:
但是beam.io.ReadFromText()
是在\n
的基础上分裂的。但问题在于该行就是这样:
1357240852939218946,"#IndiaFightsCorona:
而 ,
之后的实际第一个元素是:
#IndiaFightsCorona:\n\nNearly 4.5 million beneficiaries vaccinated against #COVID19 in 19 days.\n\nIndia is the fastest country to cross landmark of vaccinating 4 million beneficiaries in merely 18 days.\n\n#StaySafe #IndiaWillWin #Unite2FightCorona
我试过将 strip_trailing_newlines=False
传递给 ReadFromText()
但它没有帮助。如何让 beam
忽略 \n
拆分?
有什么解决方法吗?
您似乎想使用 skip_trailing_newlines
标志 according to the docs:
reader = beam.io.ReadFromText(csv_path, skip_header_lines=1, skip_trailing_newlines=False)
现在可以了吗?
apache_beam.io.ReadFromText
假定所有换行符都描述记录。如果您的 csv 文件包含跨越行的记录,则有几个选项。
(1) 创建文件名的 PCollection(例如使用 MatchFiles or just globbling and using beam.Create
) followed by a DoFn
that opens the file (possibly using the calls from filesystems 支持 GCS、HDFS 等)并使用 Python 的 CSV reader 生成行。 (与 ReadFromText 相比的缺点是,这会强制整个文件在单个 worker 中读取。)
或 (2) 使用 Beam 的 Dataframe APIs 中的 apache_beam.dataframe.io.read_csv
。这将提供一个延迟数据帧,您可以像使用 Pandas 数据帧一样使用它,或者将其转换为具有 apache_beam.dataframe.convert.to_pcollection
.
的 PCollection
这是我所做的。定义 RowReader
如下:
class RowReader(beam.DoFn):
def process(self, row):
text_1 = str(row[1]).encode()
text_2 = str(row[4]).encode()
image_1 = tf.io.read_file(row[7]).numpy()
image_2 = tf.io.read_file(row[8]).numpy()
label = int(row[9])
yield {"text_1": text_1, "image_1": image_1,
"text_2": text_2, "image_2": image_2,
"label": label}
然后定义管道如下:
with beam.Pipeline(args.runner, options=options) as pipe:
df = (
pipe
| read_csv(csv_path)
)
pc = to_pcollection(df)
_ = (
pc
| 'Decode from CSV' >> beam.ParDo(RowReader())
...
)
read_csv
导入为 from apache_beam.dataframe.io import read_csv
,to_pcollection
导入为 from apache_beam.dataframe.convert import to_pcollection
。
我有一个 .csv
文件,我试图在管道中使用 apache_beam.io.ReadFromText()
读取它(beam
是 apache_beam
的别名):
reader = beam.io.ReadFromText(csv_path, skip_header_lines=1)
我有一行 reader class 如下所示:
class RowReader(beam.DoFn):
def process(self, row):
row_delimited = row.split(",")
text_1 = str(row_delimited[1]).encode()
text_2 = str(row_delimited[4]).encode()
image_1 = tf.io.read_file(row_delimited[-3]).numpy()
image_2 = tf.io.read_file(row_delimited[-2]).numpy()
label = row_delimited[-1]
yield {"text_1": text_1, "image_1": image_1,
"text_2": text_2, "image_2": image_2,
"label": label}
csv 文件中的其中一行有如下条目:
但是beam.io.ReadFromText()
是在\n
的基础上分裂的。但问题在于该行就是这样:
1357240852939218946,"#IndiaFightsCorona:
而 ,
之后的实际第一个元素是:
#IndiaFightsCorona:\n\nNearly 4.5 million beneficiaries vaccinated against #COVID19 in 19 days.\n\nIndia is the fastest country to cross landmark of vaccinating 4 million beneficiaries in merely 18 days.\n\n#StaySafe #IndiaWillWin #Unite2FightCorona
我试过将 strip_trailing_newlines=False
传递给 ReadFromText()
但它没有帮助。如何让 beam
忽略 \n
拆分?
有什么解决方法吗?
您似乎想使用 skip_trailing_newlines
标志 according to the docs:
reader = beam.io.ReadFromText(csv_path, skip_header_lines=1, skip_trailing_newlines=False)
现在可以了吗?
apache_beam.io.ReadFromText
假定所有换行符都描述记录。如果您的 csv 文件包含跨越行的记录,则有几个选项。
(1) 创建文件名的 PCollection(例如使用 MatchFiles or just globbling and using beam.Create
) followed by a DoFn
that opens the file (possibly using the calls from filesystems 支持 GCS、HDFS 等)并使用 Python 的 CSV reader 生成行。 (与 ReadFromText 相比的缺点是,这会强制整个文件在单个 worker 中读取。)
或 (2) 使用 Beam 的 Dataframe APIs 中的 apache_beam.dataframe.io.read_csv
。这将提供一个延迟数据帧,您可以像使用 Pandas 数据帧一样使用它,或者将其转换为具有 apache_beam.dataframe.convert.to_pcollection
.
这是我所做的。定义 RowReader
如下:
class RowReader(beam.DoFn):
def process(self, row):
text_1 = str(row[1]).encode()
text_2 = str(row[4]).encode()
image_1 = tf.io.read_file(row[7]).numpy()
image_2 = tf.io.read_file(row[8]).numpy()
label = int(row[9])
yield {"text_1": text_1, "image_1": image_1,
"text_2": text_2, "image_2": image_2,
"label": label}
然后定义管道如下:
with beam.Pipeline(args.runner, options=options) as pipe:
df = (
pipe
| read_csv(csv_path)
)
pc = to_pcollection(df)
_ = (
pc
| 'Decode from CSV' >> beam.ParDo(RowReader())
...
)
read_csv
导入为 from apache_beam.dataframe.io import read_csv
,to_pcollection
导入为 from apache_beam.dataframe.convert import to_pcollection
。