BigQuery 不接受来自 protobuf 的二进制数据
BigQuery does not accept binary data from protobuf
我有一个数据流管道将数据从 pub sub 解析到大查询。
数据为 proto3 格式。
我从 pubsub 接收到的数据是使用来自 protobuf 的 'SerializeToString()' 方法编码的。
然后我反序列化它并将解析后的数据插入到 bigquery 中,它工作得很好。但是,有人要求我在收到 probotobuf 时存储二进制数据,以防插入时出现问题。
为此,我创建了一个简单的 bigquery table,只有一个字段 'data',接受 BYTES。
所以我在我的管道中添加了一个步骤,它只是从 PubSub 消息中获取数据并且 return 它:
class GetBytes(beam.DoFn):
def process(self, element):
obj: Dict = {
'data': element.data
}
logging.info(f'data bytes: {obj}')
logging.info(f'data type: {type(obj["data"])}')
return [obj]
这是我用来插入 BQ 的管道中的行:
bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))
bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery('my_project:my_dataset.my_table')
logs 似乎得到了正确的数据:
2020-09-29 11:16:40.094 CESTdata bytes: {'data': b'\n\x04\x08\x01\x10\x02\n\x04\x08\x02\x10\x02\n\x02\x08\x03\n\x04\x08\x04\x10\x02\n\x04\x08\x05\x10\x02\n\x04\x08\x06\x10\x02\n\x02\x08\x07\n\x04\x08\x08\x10\x01\n\x02\x08\t\n\x04\x08\n\x10\x01\n\x04\x08\x0b\x10\x02\n\x02\x08\x0c\n\x04\x08\r\x10\x02\n\x04\x08\x0e\x10\x02\n\x04\x08\x0f\x10\x02\n\x04\x08\x10\x10\x02\n\x04\x08\x11\x10\x01\n\x04\x08\x12\x10\x01\n\x04\x08\x01\x10\x02\n\x02\x08\x02\n\x04\x08\x03\x10\x01\n\x02\x08\x04\n\x04\x08\x05\x10\x02\n\x04\x08\x06\x10\x01\n\x04\x08\x07\x10\x02\n\x02\x08\x08\n\x04\x08\t\x10\x01\n\x04\x08\n\x10\x02\n\x04\x08\x0b\x10\x01\n\x02\x08\x0c\n\x04\x08\r\x10\x02\n\x04\x08\x0e\x10\x02\n\x04\x08\x0f\x10\x02\n\x04\x08\x10\x10\x02\n\x04\x08\x11\x10\x02\n\x04\x08\x12\x10\x02\x10\xb4\x95\x99\xc9\xcd.'}
但我不断收到错误消息:
UnicodeDecodeError: 'utf-8 [while running 'generatedPtransform-297']' codec can't decode byte 0x89 in position 101: invalid start byte
(可能错误与之前的日志不符,但一直都是这种信息)
我尝试从 BigQuery UI 插入我的字节数据,一切顺利...
知道哪里出了问题吗?
谢谢:)
BigQuery 要求 bytes
值以这种方式编写时进行 base64 编码。您可以在 https://beam.apache.org/releases/pydoc/2.24.0/apache_beam.io.gcp.bigquery.html#additional-parameters-for-bigquery-tables
找到一些文档和链接以了解更多详细信息
我有一个数据流管道将数据从 pub sub 解析到大查询。 数据为 proto3 格式。
我从 pubsub 接收到的数据是使用来自 protobuf 的 'SerializeToString()' 方法编码的。
然后我反序列化它并将解析后的数据插入到 bigquery 中,它工作得很好。但是,有人要求我在收到 probotobuf 时存储二进制数据,以防插入时出现问题。
为此,我创建了一个简单的 bigquery table,只有一个字段 'data',接受 BYTES。
所以我在我的管道中添加了一个步骤,它只是从 PubSub 消息中获取数据并且 return 它:
class GetBytes(beam.DoFn):
def process(self, element):
obj: Dict = {
'data': element.data
}
logging.info(f'data bytes: {obj}')
logging.info(f'data type: {type(obj["data"])}')
return [obj]
这是我用来插入 BQ 的管道中的行:
bytes_status = (status | 'Get Bytes Result' >> beam.ParDo(GetBytes()))
bytes_status | 'Write to BQ BackUp' >> beam.io.WriteToBigQuery('my_project:my_dataset.my_table')
logs 似乎得到了正确的数据:
2020-09-29 11:16:40.094 CESTdata bytes: {'data': b'\n\x04\x08\x01\x10\x02\n\x04\x08\x02\x10\x02\n\x02\x08\x03\n\x04\x08\x04\x10\x02\n\x04\x08\x05\x10\x02\n\x04\x08\x06\x10\x02\n\x02\x08\x07\n\x04\x08\x08\x10\x01\n\x02\x08\t\n\x04\x08\n\x10\x01\n\x04\x08\x0b\x10\x02\n\x02\x08\x0c\n\x04\x08\r\x10\x02\n\x04\x08\x0e\x10\x02\n\x04\x08\x0f\x10\x02\n\x04\x08\x10\x10\x02\n\x04\x08\x11\x10\x01\n\x04\x08\x12\x10\x01\n\x04\x08\x01\x10\x02\n\x02\x08\x02\n\x04\x08\x03\x10\x01\n\x02\x08\x04\n\x04\x08\x05\x10\x02\n\x04\x08\x06\x10\x01\n\x04\x08\x07\x10\x02\n\x02\x08\x08\n\x04\x08\t\x10\x01\n\x04\x08\n\x10\x02\n\x04\x08\x0b\x10\x01\n\x02\x08\x0c\n\x04\x08\r\x10\x02\n\x04\x08\x0e\x10\x02\n\x04\x08\x0f\x10\x02\n\x04\x08\x10\x10\x02\n\x04\x08\x11\x10\x02\n\x04\x08\x12\x10\x02\x10\xb4\x95\x99\xc9\xcd.'}
但我不断收到错误消息:
UnicodeDecodeError: 'utf-8 [while running 'generatedPtransform-297']' codec can't decode byte 0x89 in position 101: invalid start byte
(可能错误与之前的日志不符,但一直都是这种信息)
我尝试从 BigQuery UI 插入我的字节数据,一切顺利...
知道哪里出了问题吗?
谢谢:)
BigQuery 要求 bytes
值以这种方式编写时进行 base64 编码。您可以在 https://beam.apache.org/releases/pydoc/2.24.0/apache_beam.io.gcp.bigquery.html#additional-parameters-for-bigquery-tables