NotImplementedError apache 光束 python
NotImplementedError apache beam python
我正在使用 apache beam 将 json 写入 gcs。但是遇到如下错误
NotImplementedError: offset: 0, whence: 0, position: 50547, last: 50547 [while running 'Writing new data to gcs/write data gcs/Write/WriteImpl/WriteBundles/WriteBundles']
不知道为什么会出现这个错误。其代码如下:
class WriteDataGCS(beam.PTransform):
"""
To write data to GCS
"""
def __init__(self, bucket):
"""
Initiate the bucket as a class field
:type bucket:string
:param bucket: query to be run for data
"""
self.bucket = bucket
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
(pcoll | "print intermediate" >> beam.Map(print_row))
return (pcoll | "write data gcs" >> beam.io.WriteToText(self.bucket, coder=JsonCoder(), file_name_suffix=".json"))
class JsonCoder:
"""
This class represents dump and load operations performed on json
"""
def encode(self,data):
"""
Encodes the json data.
:type data: string
:param data: Data to be encoded
"""
# logger.info("JSON DATA for encoding - {}".format(data))
return json.dumps(data,default=str)
def decode(self,data):
"""
Decodes the json data.
:type data: string
:param data: Data to be decoded
"""
# logger.info("JSON DATA for decoding - {}".format(data))
return json.loads(data)
WriteToText
的 coder
参数需要一个 apache_beam.coders.Coder
实例。您可以尝试让 JsonCoder
从基础 Coder
class 继承,但我认为您也可以使用 Map
:
将数据转换为字符串
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
return (pcoll
| "print intermediate" >> beam.Map(print_row))
| "to_json" >> beam.Map(lambda x: json.dumps(x, default=str)))
| "write data gcs" >> beam.io.WriteToText(self.bucket, file_name_suffix=".json"))
我正在使用 apache beam 将 json 写入 gcs。但是遇到如下错误
NotImplementedError: offset: 0, whence: 0, position: 50547, last: 50547 [while running 'Writing new data to gcs/write data gcs/Write/WriteImpl/WriteBundles/WriteBundles']
不知道为什么会出现这个错误。其代码如下:
class WriteDataGCS(beam.PTransform):
"""
To write data to GCS
"""
def __init__(self, bucket):
"""
Initiate the bucket as a class field
:type bucket:string
:param bucket: query to be run for data
"""
self.bucket = bucket
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
(pcoll | "print intermediate" >> beam.Map(print_row))
return (pcoll | "write data gcs" >> beam.io.WriteToText(self.bucket, coder=JsonCoder(), file_name_suffix=".json"))
class JsonCoder:
"""
This class represents dump and load operations performed on json
"""
def encode(self,data):
"""
Encodes the json data.
:type data: string
:param data: Data to be encoded
"""
# logger.info("JSON DATA for encoding - {}".format(data))
return json.dumps(data,default=str)
def decode(self,data):
"""
Decodes the json data.
:type data: string
:param data: Data to be decoded
"""
# logger.info("JSON DATA for decoding - {}".format(data))
return json.loads(data)
WriteToText
的 coder
参数需要一个 apache_beam.coders.Coder
实例。您可以尝试让 JsonCoder
从基础 Coder
class 继承,但我认为您也可以使用 Map
:
def expand(self, pcoll):
"""
PTransform Method run when called on Class Name
:type pcoll: PCollection
:param pcoll: A pcollection
"""
return (pcoll
| "print intermediate" >> beam.Map(print_row))
| "to_json" >> beam.Map(lambda x: json.dumps(x, default=str)))
| "write data gcs" >> beam.io.WriteToText(self.bucket, file_name_suffix=".json"))