apache-beam 从 GCS 桶的多个文件夹中读取多个文件并加载它 biquery python

apache-beam reading multiple files from multiple folders of GCS buckets and load it biquery python

我想每小时设置一个管道来解析 GCS 存储桶不同文件夹中的 2000 个原始 protobuf 格式文件,并将数据加载到大查询中。到目前为止,我能够成功解析原型数据。

我知道读取一个文件夹中所有文件的通配符方法,但我现在不想要它,因为我有来自不同文件夹的数据,我想 运行 像并行一样更快,而不是在顺序方式

喜欢下面

for x,filename enumerate(file_separted_comma):
    --read data from prto
    --load data to bigquery 

现在我想知道以下方法是否是解析 apache beam 中不同文件夹中的多个文件并将数据加载到大查询中的最佳或推荐方法。

还有一件事,从 proto 解析后的每条记录,我将其变成 JSON 记录以加载到大查询中,我不知道这也是将数据加载到大查询的好方法查询而不是直接加载反序列化(已解析)的原型数据。

我正在从 Hadoop 作业转移到数据流,以通过设置此管道来降低成本。

我是 apache-beam 的新手,不知道什么是缺点和优点,因此有人可以看一下代码并在这里帮助我制定更好的生产方法

import time
import sys
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import csv
import base64
import rtbtracker_log_pb2
from google.protobuf import timestamp_pb2
from google.protobuf.json_format import MessageToDict
from google.protobuf.json_format import MessageToJson
import io
from apache_beam.io.filesystems import FileSystems


def get_deserialized_log(serialized_log):
    log = rtbtracker_log_pb2.RtbTrackerLogProto()
    log.ParseFromString(serialized_log)
    return log


def print_row(message):
    message=message[3]
    message = message.replace('_', '/');
    message = message.replace('*', '=');
    message = message.replace('-', '+');
    #finalbunary=base64.b64decode(message.decode('UTF-8'))
    finalbunary=base64.b64decode(message)
    msg=get_deserialized_log(finalbunary)

    jsonObj = MessageToDict(msg)
    #jsonObj = MessageToJson(msg)
    return jsonObj

def parse_file(element):
  for line in csv.reader([element], quotechar='"', delimiter='\t', quoting=csv.QUOTE_ALL, skipinitialspace=True):
    return line



def run():
    parser = argparse.ArgumentParser()
    parser.add_argument("--input", dest="input", required=False)
    parser.add_argument("--output", dest="output", required=False)
    app_args, pipeline_args = parser. parse_known_args()

    with beam.Pipeline(options=PipelineOptions()) as p:
        input_list=app_args.input
        file_list = input_list.split(",")
        res_list = ["/home/file_{}-00000-of-00001.json".format(i) for i in range(len(file_list))]

        for i,file in enumerate(file_list):
            onesec=p | "Read Text {}".format(i) >> beam.io.textio.ReadFromText(file)
            parsingProtoFile=onesec | 'Parse file{}'.format(i) >> beam.Map(parse_file)
            printFileConetent=parsingProtoFile | 'Print output {}'.format(i) >>beam.Map(print_row)
        
            #i want to load to bigquery here
            ##LOAD DATA TO BIGQUERY

            #secondsec=printFileConetent | "Write TExt {}".format(i) >> ##beam.io.WriteToText("/home/file_{}".format(i),file_name_suffix=".json", 
###num_shards=1 , 
##append_trailing_newlines = True)
        

if __name__ == '__main__':
    run()

运行本地代码如下

python3 another_main.py --input=tracker_one.gz,tracker_two.gz

我没有提及输出路径,因为我不想将数据保存到 gcs,因为我会将其加载到 bigquery

和下面的 运行ning 在数据流中 运行ner

python3 final_beam_v1.py --input gs://bucket/folder/2020/12/23/00/00/fileread.gz --output gs://bucket/beamoutput_four/ --runner DataflowRunner --project PROJECT --staging_location gs://bucket/staging_four --temp_location gs://bucket/temp_four --region us-east1 --setup_file ./setup.py --job_name testing

注意到对于同一个作业名称中的单个输入文件,两个作业将 运行ning 并且不知道为什么会发生这种情况以及相同

的 PFA 屏幕截图

这种读取文件的方法是可以的(只要输入的文件数量不是太大)。但是,如果您可以将要读取的文件集表示为通配符表达式(可以匹配多个文件夹),则性能可能会更好,并且 Dataflow 将并行读取与该模式匹配的所有文件。

要写入 BigQuery,最好使用 built-in BigQuery sink. The default behavior is to create temp files in JSON format and then load those into BigQuery, but you can also use Avro instead, which can be more efficient. You can also combine all of your inputs into one PCollection using Flatten,这样您的管道中只需要一个 BigQuery 接收器。