使用 Apache Beam 的 ReadFromText 解析包含“\n”的行

Parsing lines containing "\n" in with Apache Beam's `ReadFromText`

我有一个 .csv 文件,我试图在管道中使用 apache_beam.io.ReadFromText() 读取它(beamapache_beam 的别名):

reader = beam.io.ReadFromText(csv_path, skip_header_lines=1)

我有一行 reader class 如下所示:

class RowReader(beam.DoFn):
    def process(self, row):
        row_delimited = row.split(",")
        text_1 = str(row_delimited[1]).encode()
        text_2 = str(row_delimited[4]).encode()
        image_1 = tf.io.read_file(row_delimited[-3]).numpy()
        image_2 = tf.io.read_file(row_delimited[-2]).numpy()
        label = row_delimited[-1]
        
        yield {"text_1": text_1, "image_1": image_1,
                "text_2": text_2, "image_2": image_2,
                "label": label} 

csv 文件中的其中一行有如下条目:

但是beam.io.ReadFromText()是在\n的基础上分裂的。但问题在于该行就是这样:

1357240852939218946,"#IndiaFightsCorona:

, 之后的实际第一个元素是:

#IndiaFightsCorona:\n\nNearly 4.5 million beneficiaries vaccinated against #COVID19 in 19 days.\n\nIndia is the fastest country to cross landmark of vaccinating 4 million beneficiaries in merely 18 days.\n\n#StaySafe #IndiaWillWin #Unite2FightCorona

我试过将 strip_trailing_newlines=False 传递给 ReadFromText() 但它没有帮助。如何让 beam 忽略 \n 拆分?

有什么解决方法吗?

您似乎想使用 skip_trailing_newlines 标志 according to the docs:

reader = beam.io.ReadFromText(csv_path, skip_header_lines=1, skip_trailing_newlines=False)

现在可以了吗?

apache_beam.io.ReadFromText 假定所有换行符都描述记录。如果您的 csv 文件包含跨越行的记录,则有几个选项。

(1) 创建文件名的 PCollection(例如使用 MatchFiles or just globbling and using beam.Create) followed by a DoFn that opens the file (possibly using the calls from filesystems 支持 GCS、HDFS 等)并使用 Python 的 CSV reader 生成行。 (与 ReadFromText 相比的缺点是,这会强制整个文件在单个 worker 中读取。)

或 (2) 使用 Beam 的 Dataframe APIs 中的 apache_beam.dataframe.io.read_csv。这将提供一个延迟数据帧,您可以像使用 Pandas 数据帧一样使用它,或者将其转换为具有 apache_beam.dataframe.convert.to_pcollection.

的 PCollection

这是我所做的。定义 RowReader 如下:

class RowReader(beam.DoFn):
    def process(self, row):
        text_1 = str(row[1]).encode()
        text_2 = str(row[4]).encode()
        image_1 = tf.io.read_file(row[7]).numpy()
        image_2 = tf.io.read_file(row[8]).numpy()
        label = int(row[9])

        yield {"text_1": text_1, "image_1": image_1,
                "text_2": text_2, "image_2": image_2,
                "label": label} 

然后定义管道如下:

with beam.Pipeline(args.runner, options=options) as pipe:
  df = (
        pipe 
        | read_csv(csv_path)
      )
  pc = to_pcollection(df)
  _ = (
        pc
        | 'Decode from CSV' >> beam.ParDo(RowReader())
        ...
      )

read_csv 导入为 from apache_beam.dataframe.io import read_csvto_pcollection 导入为 from apache_beam.dataframe.convert import to_pcollection