在数据流管道中动态设置 bigquery table id

Dynamically set bigquery table id in dataflow pipeline

我有数据流管道,它在 Python 中,这就是它正在做的事情:

  1. 阅读来自 PubSub 的消息。消息是压缩的协议缓冲区。 PubSub 上接收的一条消息包含多种类型的消息。请参阅下面的协议父消息规范:

    message BatchEntryPoint {
    
     /**
     * EntryPoint
     * 
     * Description: Encapsulation message
     */
    message EntryPoint {
    // Proto Message
    google.protobuf.Any proto = 1;
    
    // Timestamp
    google.protobuf.Timestamp timestamp = 4;
    }
    
    // Array of EntryPoint messages
    repeated EntryPoint entrypoints = 1;
    }
    

所以,为了更好地解释,我有几个 protobuf 消息。每条消息都必须打包在 EntryPoint 消息的 proto 字段中,由于 MQTT 的限制,我们一次发送多条消息,这就是为什么我们在 BatchEntryPoint 上使用指向 EntryPoint 消息的重复字段。

  1. 正在解析收到的消息。

这里没什么特别的,只是解压缩和反序列化我们刚刚从 PubSub 读取的消息。获取 'humain readable' 数据。

  1. BatchEntryPoint 上的 For Loop 以评估每个 EntryPoint 消息。

由于 BatchEntryPoint 上的每条消息可以有不同的类型,我们需要对它们进行不同的处理

  1. 已解析的消息数据

执行不同的过程来获取我需要的所有信息并将其格式化为 BigQuery 可读格式

  1. 将数据写入 bigQuery

这是我的 'trouble' 开始的地方,所以我的代码可以工作,但在我看来它很脏而且很难维护。 有两点需要注意。
每个消息的类型都可以发送到 3 个不同的数据集,一个研发数据集,一个开发数据集和一个生产数据集。 假设我有一条名为 System. 它可以去:

这就是我现在正在做的事情:

console_records | 'Write to Console BQ' >> beam.io.WriteToBigQuery(
    lambda e: 'my-project:rd_dataset.table1' if dataset_is_rd_table1(e) else (
        'my-project:dev_dataset.table1' if dataset_is_dev_table1(e) else (
        'my-project:prod_dataset.table1' if dataset_is_prod_table1(e) else (
        'my-project:rd_dataset.table2' if dataset_is_rd_table2(e) else (
        'my-project:dev_dataset.table2' if dataset_is_dev_table2(e) else (
        ...) else 0

我有超过 30 种不同类型的消息,用于将数据插入大查询的行超过 90 行。

下面是 dataset_is_..._tableX 方法的样子:

def dataset_is_rd_messagestype(element) -> bool:
""" check if env is rd for message's type message """
valid: bool = False
is_type = check_element_type(element, 'MessagesType')
if is_type:
    valid = dataset_is_rd(element)
return valid

check_element_type 检查消息的类型是否正确(例如:系统)。
dataset_is_rd 看起来像这样:

def dataset_is_rd(element) -> bool:
    """ Check if dataset should be RD from registry id """
    if element['device_registry_id'] == 'rd':
        del element['device_registry_id']
        del element['bq_type']
        return True
    return False

作为键的元素指示我们必须在哪个数据集上发送消息。

所以这是按预期工作的,但我希望我可以编写更简洁的代码,并可能减少在添加或删除消息类型时要更改的代码量。

有什么想法吗?

如何使用 TaggedOutput.

你能不能这样写:

def dataset_type(element) -> bool:
    """ Check if dataset should be RD from registry id """
    dev_registry = element['device_registry_id']
    del element['device_registry_id']
    del element['bq_type']
    table_type = get_element_type(element, 'MessagesType')
    return 'my-project:%s_dataset.table%d' % (dev_registry, table_type)

然后将其用作传递给 BQ 的 table lambda?

所以我设法通过动态创建 table 名称来创建代码以将数据插入动态 table。

这并不完美,因为我必须修改传递给方法的元素,但是我对结果仍然非常满意,它清理了我的数百行代码。如果我有一个新的 table,与之前在管道中的 6 行相比,添加它需要数组中的一行。

这是我的解决方案:

def batch_pipeline(pipeline):
    console_message = (
            pipeline
            | 'Get console\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub1',
        with_attributes=True)
    )
    common_message = (
            pipeline
            | 'Get common\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub2',
        with_attributes=True)
    )
    jetson_message = (
            pipeline
            | 'Get jetson\'s message from pub/sub' >> beam.io.ReadFromPubSub(
        subscription='sub3',
        with_attributes=True)
    )

 

message = (console_message, common_message, jetson_message) | beam.Flatten()
clear_message = message | beam.ParDo(GetClearMessage())
console_bytes = clear_message | beam.ParDo(SetBytesData())
console_bytes | 'Write to big query back up table' >> beam.io.WriteToBigQuery(
    lambda e: write_to_backup(e)
)
records = clear_message | beam.ParDo(GetProtoData())
gps_records = clear_message | 'Get GPS Data' >> beam.ParDo(GetProtoData())
parsed_gps = gps_records | 'Parse GPS Data' >> beam.ParDo(ParseGps())
if parsed_gps:
    parsed_gps | 'Write to big query gps table' >> beam.io.WriteToBigQuery(
        lambda e: write_gps(e)
    )
records | 'Write to big query table' >> beam.io.WriteToBigQuery(
    lambda e: write_to_bq(e)
)

所以管道正在从 3 个不同的 pub sub 读取数据,提取数据并写入大查询。

WriteToBigQuery 使用的元素结构如下所示:

  obj = {
        'data': data_to_write_on_bq,
        'registry_id': data_needed_to_craft_table_name,
        'gcloud_id': data_to_write_on_bq,
        'proto_type': data_needed_to_craft_table_name
  }

然后我在 WriteToBigQuery 的 lambda 上使用的方法之一如下所示:

def write_to_bq(e):
    logging.info(e)
    element = copy(e)
    registry = element['registry_id']
    logging.info(registry)
    dataset = set_dataset(registry) # set dataset name, knowing the registry, this is to set the environment (dev/prod/rd/...)
    proto_type = element['proto_type']
    logging.info('Proto Type %s', proto_type)
    table_name = reduce(lambda x, y: x + ('_' if y.isupper() else '') + y, proto_type).lower()
    full_table_name = f'my_project:{dataset}.{table_name}'
    logging.info(full_table_name)
    del e['registry_id']
    del e['proto_type']

    return full_table_name

就是这样,经过 3 天的麻烦!!