如何将 csv 转换为 apache beam 数据流中的字典
How to convert csv into a dictionary in apache beam dataflow
我想读取一个 csv 文件并使用 apache beam 数据流将其写入 BigQuery。为此,我需要以字典的形式将数据呈现给 BigQuery。我怎样才能使用 apache beam 转换数据才能做到这一点?
我的输入 csv 文件有两列,我想在 BigQuery 中创建后续的两列 table。我知道如何在 BigQuery 中创建数据,这很简单,我不知道如何将 csv 转换为字典。下面的代码不正确,但应该让我了解我正在尝试做什么。
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
编辑:从版本 2.12.0 开始,Beam 带有新的 fileio
转换,允许您从 CSV 中读取而无需重新实现源。您可以这样做:
def get_csv_reader(readable_file):
# You can return whichever kind of reader you want here
# a DictReader, or a normal csv.reader.
if sys.version_info >= (3, 0):
return csv.reader(io.TextIOWrapper(readable_file.open()))
else:
return csv.reader(readable_file.open())
with Pipeline(...) as p:
content_pc = (p
| beam.io.fileio.MatchFiles("/my/file/name")
| beam.io.fileio.ReadMatches()
| beam.Reshuffle() # Useful if you expect many matches
| beam.FlatMap(get_csv_reader))
我最近为 Apache Beam 写了一个测试。你可以看看the Github repository。
旧答案 依赖于重新实现源代码。这不再是这样做的主要推荐方式:)
我们的想法是拥有一个 returns 解析 CSV 行的来源。您可以通过子 classing FileBasedSource
class 来包括 CSV 解析来做到这一点。特别是,read_records
函数看起来像这样:
class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.reader(self._file)
for rec in reader:
yield rec
作为对 Pablo post 的补充,我想分享一下我自己对他的样本所做的一点改动。 (为你+1!)
已更改:
reader = csv.reader(self._file)
到 reader = csv.DictReader(self._file)
csv.DictReader
使用 CSV 文件的第一行作为字典键。其他行用于用它的值填充每行的字典。它会根据列顺序自动将正确的值放入正确的键中。
一个小细节是 Dict 中的每个值都存储为字符串。如果您使用例如,这可能会与您的 BigQuery 架构冲突。某些字段为 INTEGER。所以你之后需要注意正确的转换。
我想读取一个 csv 文件并使用 apache beam 数据流将其写入 BigQuery。为此,我需要以字典的形式将数据呈现给 BigQuery。我怎样才能使用 apache beam 转换数据才能做到这一点?
我的输入 csv 文件有两列,我想在 BigQuery 中创建后续的两列 table。我知道如何在 BigQuery 中创建数据,这很简单,我不知道如何将 csv 转换为字典。下面的代码不正确,但应该让我了解我正在尝试做什么。
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectPipelineRunner')
# Create a PCollection with names and write it to a file.
(p
| 'read solar data' >> beam.Read(beam.io.TextFileSource('./sensor1_121116.csv'))
# How do you do this??
| 'convert to dictionary' >> beam.Map(lambda (k, v): {'luminosity': k, 'datetime': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
编辑:从版本 2.12.0 开始,Beam 带有新的 fileio
转换,允许您从 CSV 中读取而无需重新实现源。您可以这样做:
def get_csv_reader(readable_file):
# You can return whichever kind of reader you want here
# a DictReader, or a normal csv.reader.
if sys.version_info >= (3, 0):
return csv.reader(io.TextIOWrapper(readable_file.open()))
else:
return csv.reader(readable_file.open())
with Pipeline(...) as p:
content_pc = (p
| beam.io.fileio.MatchFiles("/my/file/name")
| beam.io.fileio.ReadMatches()
| beam.Reshuffle() # Useful if you expect many matches
| beam.FlatMap(get_csv_reader))
我最近为 Apache Beam 写了一个测试。你可以看看the Github repository。
旧答案 依赖于重新实现源代码。这不再是这样做的主要推荐方式:)
我们的想法是拥有一个 returns 解析 CSV 行的来源。您可以通过子 classing FileBasedSource
class 来包括 CSV 解析来做到这一点。特别是,read_records
函数看起来像这样:
class MyCsvFileSource(apache_beam.io.filebasedsource.FileBasedSource):
def read_records(self, file_name, range_tracker):
self._file = self.open_file(file_name)
reader = csv.reader(self._file)
for rec in reader:
yield rec
作为对 Pablo post 的补充,我想分享一下我自己对他的样本所做的一点改动。 (为你+1!)
已更改:
reader = csv.reader(self._file)
到 reader = csv.DictReader(self._file)
csv.DictReader
使用 CSV 文件的第一行作为字典键。其他行用于用它的值填充每行的字典。它会根据列顺序自动将正确的值放入正确的键中。
一个小细节是 Dict 中的每个值都存储为字符串。如果您使用例如,这可能会与您的 BigQuery 架构冲突。某些字段为 INTEGER。所以你之后需要注意正确的转换。