在数据流管道中动态设置 bigquery table id
Dynamically set bigquery table id in dataflow pipeline
我有数据流管道,它在 Python 中,这就是它正在做的事情:
阅读来自 PubSub 的消息。消息是压缩的协议缓冲区。 PubSub 上接收的一条消息包含多种类型的消息。请参阅下面的协议父消息规范:
message BatchEntryPoint {
/**
* EntryPoint
*
* Description: Encapsulation message
*/
message EntryPoint {
// Proto Message
google.protobuf.Any proto = 1;
// Timestamp
google.protobuf.Timestamp timestamp = 4;
}
// Array of EntryPoint messages
repeated EntryPoint entrypoints = 1;
}
所以,为了更好地解释,我有几个 protobuf 消息。每条消息都必须打包在 EntryPoint 消息的 proto 字段中,由于 MQTT 的限制,我们一次发送多条消息,这就是为什么我们在 BatchEntryPoint 上使用指向 EntryPoint 消息的重复字段。
- 正在解析收到的消息。
这里没什么特别的,只是解压缩和反序列化我们刚刚从 PubSub 读取的消息。获取 'humain readable' 数据。
- BatchEntryPoint 上的 For Loop 以评估每个 EntryPoint 消息。
由于 BatchEntryPoint 上的每条消息可以有不同的类型,我们需要对它们进行不同的处理
- 已解析的消息数据
执行不同的过程来获取我需要的所有信息并将其格式化为 BigQuery 可读格式
- 将数据写入 bigQuery
这是我的 'trouble' 开始的地方,所以我的代码可以工作,但在我看来它很脏而且很难维护。
有两点需要注意。
每个消息的类型都可以发送到 3 个不同的数据集,一个研发数据集,一个开发数据集和一个生产数据集。
假设我有一条名为 System.
它可以去:
- 我的项目:rd_dataset.system
- 我的项目:dev_dataset.system
- 我的项目:prod_dataset.system
这就是我现在正在做的事情:
console_records | 'Write to Console BQ' >> beam.io.WriteToBigQuery(
lambda e: 'my-project:rd_dataset.table1' if dataset_is_rd_table1(e) else (
'my-project:dev_dataset.table1' if dataset_is_dev_table1(e) else (
'my-project:prod_dataset.table1' if dataset_is_prod_table1(e) else (
'my-project:rd_dataset.table2' if dataset_is_rd_table2(e) else (
'my-project:dev_dataset.table2' if dataset_is_dev_table2(e) else (
...) else 0
我有超过 30 种不同类型的消息,用于将数据插入大查询的行超过 90 行。
下面是 dataset_is_..._tableX 方法的样子:
def dataset_is_rd_messagestype(element) -> bool:
""" check if env is rd for message's type message """
valid: bool = False
is_type = check_element_type(element, 'MessagesType')
if is_type:
valid = dataset_is_rd(element)
return valid
check_element_type 检查消息的类型是否正确(例如:系统)。
dataset_is_rd 看起来像这样:
def dataset_is_rd(element) -> bool:
""" Check if dataset should be RD from registry id """
if element['device_registry_id'] == 'rd':
del element['device_registry_id']
del element['bq_type']
return True
return False
作为键的元素指示我们必须在哪个数据集上发送消息。
所以这是按预期工作的,但我希望我可以编写更简洁的代码,并可能减少在添加或删除消息类型时要更改的代码量。
有什么想法吗?
如何使用 TaggedOutput.
你能不能这样写:
def dataset_type(element) -> bool:
""" Check if dataset should be RD from registry id """
dev_registry = element['device_registry_id']
del element['device_registry_id']
del element['bq_type']
table_type = get_element_type(element, 'MessagesType')
return 'my-project:%s_dataset.table%d' % (dev_registry, table_type)
然后将其用作传递给 BQ 的 table
lambda?
所以我设法通过动态创建 table 名称来创建代码以将数据插入动态 table。
这并不完美,因为我必须修改传递给方法的元素,但是我对结果仍然非常满意,它清理了我的数百行代码。如果我有一个新的 table,与之前在管道中的 6 行相比,添加它需要数组中的一行。
这是我的解决方案:
def batch_pipeline(pipeline):
console_message = (
pipeline
| 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub1',
with_attributes=True)
)
common_message = (
pipeline
| 'Get common\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub2',
with_attributes=True)
)
jetson_message = (
pipeline
| 'Get jetson\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub3',
with_attributes=True)
)
message = (console_message, common_message, jetson_message) | beam.Flatten()
clear_message = message | beam.ParDo(GetClearMessage())
console_bytes = clear_message | beam.ParDo(SetBytesData())
console_bytes | 'Write to big query back up table' >> beam.io.WriteToBigQuery(
lambda e: write_to_backup(e)
)
records = clear_message | beam.ParDo(GetProtoData())
gps_records = clear_message | 'Get GPS Data' >> beam.ParDo(GetProtoData())
parsed_gps = gps_records | 'Parse GPS Data' >> beam.ParDo(ParseGps())
if parsed_gps:
parsed_gps | 'Write to big query gps table' >> beam.io.WriteToBigQuery(
lambda e: write_gps(e)
)
records | 'Write to big query table' >> beam.io.WriteToBigQuery(
lambda e: write_to_bq(e)
)
所以管道正在从 3 个不同的 pub sub 读取数据,提取数据并写入大查询。
WriteToBigQuery 使用的元素结构如下所示:
obj = {
'data': data_to_write_on_bq,
'registry_id': data_needed_to_craft_table_name,
'gcloud_id': data_to_write_on_bq,
'proto_type': data_needed_to_craft_table_name
}
然后我在 WriteToBigQuery 的 lambda 上使用的方法之一如下所示:
def write_to_bq(e):
logging.info(e)
element = copy(e)
registry = element['registry_id']
logging.info(registry)
dataset = set_dataset(registry) # set dataset name, knowing the registry, this is to set the environment (dev/prod/rd/...)
proto_type = element['proto_type']
logging.info('Proto Type %s', proto_type)
table_name = reduce(lambda x, y: x + ('_' if y.isupper() else '') + y, proto_type).lower()
full_table_name = f'my_project:{dataset}.{table_name}'
logging.info(full_table_name)
del e['registry_id']
del e['proto_type']
return full_table_name
就是这样,经过 3 天的麻烦!!
我有数据流管道,它在 Python 中,这就是它正在做的事情:
阅读来自 PubSub 的消息。消息是压缩的协议缓冲区。 PubSub 上接收的一条消息包含多种类型的消息。请参阅下面的协议父消息规范:
message BatchEntryPoint { /** * EntryPoint * * Description: Encapsulation message */ message EntryPoint { // Proto Message google.protobuf.Any proto = 1; // Timestamp google.protobuf.Timestamp timestamp = 4; } // Array of EntryPoint messages repeated EntryPoint entrypoints = 1; }
所以,为了更好地解释,我有几个 protobuf 消息。每条消息都必须打包在 EntryPoint 消息的 proto 字段中,由于 MQTT 的限制,我们一次发送多条消息,这就是为什么我们在 BatchEntryPoint 上使用指向 EntryPoint 消息的重复字段。
- 正在解析收到的消息。
这里没什么特别的,只是解压缩和反序列化我们刚刚从 PubSub 读取的消息。获取 'humain readable' 数据。
- BatchEntryPoint 上的 For Loop 以评估每个 EntryPoint 消息。
由于 BatchEntryPoint 上的每条消息可以有不同的类型,我们需要对它们进行不同的处理
- 已解析的消息数据
执行不同的过程来获取我需要的所有信息并将其格式化为 BigQuery 可读格式
- 将数据写入 bigQuery
这是我的 'trouble' 开始的地方,所以我的代码可以工作,但在我看来它很脏而且很难维护。
有两点需要注意。
每个消息的类型都可以发送到 3 个不同的数据集,一个研发数据集,一个开发数据集和一个生产数据集。
假设我有一条名为 System.
它可以去:
- 我的项目:rd_dataset.system
- 我的项目:dev_dataset.system
- 我的项目:prod_dataset.system
这就是我现在正在做的事情:
console_records | 'Write to Console BQ' >> beam.io.WriteToBigQuery(
lambda e: 'my-project:rd_dataset.table1' if dataset_is_rd_table1(e) else (
'my-project:dev_dataset.table1' if dataset_is_dev_table1(e) else (
'my-project:prod_dataset.table1' if dataset_is_prod_table1(e) else (
'my-project:rd_dataset.table2' if dataset_is_rd_table2(e) else (
'my-project:dev_dataset.table2' if dataset_is_dev_table2(e) else (
...) else 0
我有超过 30 种不同类型的消息,用于将数据插入大查询的行超过 90 行。
下面是 dataset_is_..._tableX 方法的样子:
def dataset_is_rd_messagestype(element) -> bool:
""" check if env is rd for message's type message """
valid: bool = False
is_type = check_element_type(element, 'MessagesType')
if is_type:
valid = dataset_is_rd(element)
return valid
check_element_type 检查消息的类型是否正确(例如:系统)。
dataset_is_rd 看起来像这样:
def dataset_is_rd(element) -> bool:
""" Check if dataset should be RD from registry id """
if element['device_registry_id'] == 'rd':
del element['device_registry_id']
del element['bq_type']
return True
return False
作为键的元素指示我们必须在哪个数据集上发送消息。
所以这是按预期工作的,但我希望我可以编写更简洁的代码,并可能减少在添加或删除消息类型时要更改的代码量。
有什么想法吗?
如何使用 TaggedOutput.
你能不能这样写:
def dataset_type(element) -> bool:
""" Check if dataset should be RD from registry id """
dev_registry = element['device_registry_id']
del element['device_registry_id']
del element['bq_type']
table_type = get_element_type(element, 'MessagesType')
return 'my-project:%s_dataset.table%d' % (dev_registry, table_type)
然后将其用作传递给 BQ 的 table
lambda?
所以我设法通过动态创建 table 名称来创建代码以将数据插入动态 table。
这并不完美,因为我必须修改传递给方法的元素,但是我对结果仍然非常满意,它清理了我的数百行代码。如果我有一个新的 table,与之前在管道中的 6 行相比,添加它需要数组中的一行。
这是我的解决方案:
def batch_pipeline(pipeline):
console_message = (
pipeline
| 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub1',
with_attributes=True)
)
common_message = (
pipeline
| 'Get common\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub2',
with_attributes=True)
)
jetson_message = (
pipeline
| 'Get jetson\'s message from pub/sub' >> beam.io.ReadFromPubSub(
subscription='sub3',
with_attributes=True)
)
message = (console_message, common_message, jetson_message) | beam.Flatten()
clear_message = message | beam.ParDo(GetClearMessage())
console_bytes = clear_message | beam.ParDo(SetBytesData())
console_bytes | 'Write to big query back up table' >> beam.io.WriteToBigQuery(
lambda e: write_to_backup(e)
)
records = clear_message | beam.ParDo(GetProtoData())
gps_records = clear_message | 'Get GPS Data' >> beam.ParDo(GetProtoData())
parsed_gps = gps_records | 'Parse GPS Data' >> beam.ParDo(ParseGps())
if parsed_gps:
parsed_gps | 'Write to big query gps table' >> beam.io.WriteToBigQuery(
lambda e: write_gps(e)
)
records | 'Write to big query table' >> beam.io.WriteToBigQuery(
lambda e: write_to_bq(e)
)
所以管道正在从 3 个不同的 pub sub 读取数据,提取数据并写入大查询。
WriteToBigQuery 使用的元素结构如下所示:
obj = {
'data': data_to_write_on_bq,
'registry_id': data_needed_to_craft_table_name,
'gcloud_id': data_to_write_on_bq,
'proto_type': data_needed_to_craft_table_name
}
然后我在 WriteToBigQuery 的 lambda 上使用的方法之一如下所示:
def write_to_bq(e):
logging.info(e)
element = copy(e)
registry = element['registry_id']
logging.info(registry)
dataset = set_dataset(registry) # set dataset name, knowing the registry, this is to set the environment (dev/prod/rd/...)
proto_type = element['proto_type']
logging.info('Proto Type %s', proto_type)
table_name = reduce(lambda x, y: x + ('_' if y.isupper() else '') + y, proto_type).lower()
full_table_name = f'my_project:{dataset}.{table_name}'
logging.info(full_table_name)
del e['registry_id']
del e['proto_type']
return full_table_name
就是这样,经过 3 天的麻烦!!