如何将结果写入 Dataflow/Beam 中 gcs 中的 JSON 文件
How to write result to JSON files in gcs in Dataflow/Beam
我正在使用 Python Beam SDK 0.6.0。我想将我的输出写入 Google 云存储中的 JSON。最好的方法是什么?
我想我可以使用文本 IO 接收器中的 WriteToText
,但是我必须分别格式化每一行,对吗?如何将我的结果保存到包含对象列表的有效 JSON 文件中?
好的,作为参考,我通过在 beam SDK 中 WriteToText
使用的 _TextSink
上编写自己的接收器来解决这个问题。
不确定这是否是最好的方法,但目前效果很好。希望对其他人有帮助。
import os
import json
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform
class _JsonSink(beam.io.FileSink):
"""A Dataflow sink for writing JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=beam.io.CompressionTypes.AUTO):
super(_JsonSink, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
mime_type='text/plain',
compression_type=compression_type)
self.last_rows = dict()
def open(self, temp_path):
""" Open file and initialize it w opening a list."""
file_handle = super(_JsonSink, self).open(temp_path)
file_handle.write('[\n')
return file_handle
def write_record(self, file_handle, value):
"""Writes a single encoded record converted to JSON and terminates the
line w a comma."""
if self.last_rows.get(file_handle, None) is not None:
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
file_handle.write(',\n')
self.last_rows[file_handle] = value
def close(self, file_handle):
"""Finalize the JSON list and close the file handle returned from
``open()``. Called after all records are written.
"""
if file_handle is not None:
# Write last row without a comma
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
# Close list and then the file
file_handle.write('\n]\n')
file_handle.close()
class WriteToJson(PTransform):
"""PTransform for writing to JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=beam.io.CompressionTypes.AUTO):
self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
shard_name_template, coder, compression_type)
def expand(self, pcoll):
return pcoll | Write(self._sink)
使用接收器类似于使用文本接收器的方式:
pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json')
让每个文件都包含一个包含一堆元素的列表是很困难的,因为您需要将一堆元素分组,然后将它们一起写入一个文件。我建议您使用不同的格式。
您可以考虑 JSON Lines 格式,其中文件中的每一行代表一个 JSON 元素。
将数据转换为 JSON 行应该很容易。以下转换应该可以解决问题:
class WriteToJsonLines(beam.PTransform):
def __init__(self, file_name):
self._file_name = file_name
def expand(self, pcoll):
return (pcoll
| 'format json' >> beam.Map(json.dumps)
| 'write to text' >> beam.WriteToText(self._file_name))
最后,如果您以后想要读取 JSON Lines 文件,您可以编写自己的 JsonLinesSource 或使用 beam_utils.
中的那个
虽然晚了一年,但我想添加另一种方法将结果写入 GCS 中的 json 文件。对于 apache beam 2.x 管道,此转换有效:
.withSuffix(".json")
例如:
result.apply("WriteToGCS", TextIO.write().to(bucket)
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
.withSuffix(".json")
.withNumShards(chunks));
我正在使用 Python Beam SDK 0.6.0。我想将我的输出写入 Google 云存储中的 JSON。最好的方法是什么?
我想我可以使用文本 IO 接收器中的 WriteToText
,但是我必须分别格式化每一行,对吗?如何将我的结果保存到包含对象列表的有效 JSON 文件中?
好的,作为参考,我通过在 beam SDK 中 WriteToText
使用的 _TextSink
上编写自己的接收器来解决这个问题。
不确定这是否是最好的方法,但目前效果很好。希望对其他人有帮助。
import os
import json
import apache_beam as beam
from apache_beam import coders
from apache_beam.io.iobase import Write
from apache_beam.transforms import PTransform
class _JsonSink(beam.io.FileSink):
"""A Dataflow sink for writing JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=beam.io.CompressionTypes.AUTO):
super(_JsonSink, self).__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
num_shards=num_shards,
shard_name_template=shard_name_template,
coder=coder,
mime_type='text/plain',
compression_type=compression_type)
self.last_rows = dict()
def open(self, temp_path):
""" Open file and initialize it w opening a list."""
file_handle = super(_JsonSink, self).open(temp_path)
file_handle.write('[\n')
return file_handle
def write_record(self, file_handle, value):
"""Writes a single encoded record converted to JSON and terminates the
line w a comma."""
if self.last_rows.get(file_handle, None) is not None:
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
file_handle.write(',\n')
self.last_rows[file_handle] = value
def close(self, file_handle):
"""Finalize the JSON list and close the file handle returned from
``open()``. Called after all records are written.
"""
if file_handle is not None:
# Write last row without a comma
file_handle.write(self.coder.encode(
json.dumps(self.last_rows[file_handle])))
# Close list and then the file
file_handle.write('\n]\n')
file_handle.close()
class WriteToJson(PTransform):
"""PTransform for writing to JSON files."""
def __init__(self,
file_path_prefix,
file_name_suffix='',
num_shards=0,
shard_name_template=None,
coder=coders.ToStringCoder(),
compression_type=beam.io.CompressionTypes.AUTO):
self._sink = _JsonSink(file_path_prefix, file_name_suffix, num_shards,
shard_name_template, coder, compression_type)
def expand(self, pcoll):
return pcoll | Write(self._sink)
使用接收器类似于使用文本接收器的方式:
pcol | WriteToJson('gs://path/to/file', file_name_suffix='.json')
让每个文件都包含一个包含一堆元素的列表是很困难的,因为您需要将一堆元素分组,然后将它们一起写入一个文件。我建议您使用不同的格式。
您可以考虑 JSON Lines 格式,其中文件中的每一行代表一个 JSON 元素。
将数据转换为 JSON 行应该很容易。以下转换应该可以解决问题:
class WriteToJsonLines(beam.PTransform):
def __init__(self, file_name):
self._file_name = file_name
def expand(self, pcoll):
return (pcoll
| 'format json' >> beam.Map(json.dumps)
| 'write to text' >> beam.WriteToText(self._file_name))
最后,如果您以后想要读取 JSON Lines 文件,您可以编写自己的 JsonLinesSource 或使用 beam_utils.
中的那个虽然晚了一年,但我想添加另一种方法将结果写入 GCS 中的 json 文件。对于 apache beam 2.x 管道,此转换有效:
.withSuffix(".json")
例如:
result.apply("WriteToGCS", TextIO.write().to(bucket)
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP)
.withSuffix(".json")
.withNumShards(chunks));