使用 Dataflow 将文件写入 GCS

Write files to GCS using Dataflow

Dataflow 正在通过读取批处理数据进行预处理。

从 Google 云存储 (GCS) 读取工作负载以处理数据流并将其上传回 GCS。

但是在处理完数据后,我查看了GCS。

结果-001.csv

结果-002.csv

结果-003.csv

这就是数据的划分和存储方式。 我不能将这些文件合二为一吗?

#-*- coding: utf-8 -*-
import apache_beam as beam
import csv
import json
import os
import re
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json

def preprocessing(fields):
    fields = fields.split(",")
    header = "label"
    for i in range(0, 784):
        header += (",pixel" + str(i))
    label_list_str = "["
    label_list = []
    for i in range(0,10) :
        if fields[0] == str(i) :
            label_list_str+=str(i)
        else :
            label_list_str+=("0")
        if i!=9 :
            label_list_str+=","
    label_list_str+="],"
    for i in range(1,len(fields)) :
        label_list_str+=fields[i]
        if i!=len(fields)-1:
            label_list_str+=","
    yield label_list_str


def run(project, bucket, dataset) :
        argv = [
            "--project={0}".format(project),
            "--job_name=kaggle-dataflow",
            "--save_main_session",
            "--region=asia-northeast1",
            "--staging_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
            "--temp_location=gs://{0}/kaggle-bucket-v1/".format(bucket),
            "--max_num_workers=8",
            "--worker_region=asia-northeast3",
            "--worker_disk_type=compute.googleapis.com/projects//zones//diskTypes/pd-ssd",
            "--autoscaling_algorithm=THROUGHPUT_BASED",
            "--runner=DataflowRunner",
            "--worker_region=asia-northeast3"
        ]

        result_output = 'gs://kaggle-bucket-v1/result/result.csv'
        filename = "gs://{}/train.csv".format(bucket)
        pipeline = beam.Pipeline(argv=argv)
        ptransform = (pipeline
                      | "Read from GCS" >> beam.io.ReadFromText(filename)
                      | "Kaggle data pre-processing" >> beam.FlatMap(preprocessing)
                      )
   
  
    (ptransform
    | "events:out" >> beam.io.WriteToText(
            result_output
        )
     )
    
    pipeline.run()

if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description="Run pipeline on the cloud")
    parser.add_argument("--project", dest="project", help="Unique project ID", required=True)
    parser.add_argument("--bucket", dest="bucket", help="Bucket where your data were ingested", required=True)
    parser.add_argument("--dataset", dest="dataset", help="BigQuery dataset")

    args = vars(parser.parse_args())

    print("Correcting timestamps and writing to BigQuery dataset {}".format(args["dataset"]))

    run(project=args["project"], bucket=args["bucket"], dataset=args["dataset"])

感谢您的阅读:)

方法beam.io.WriteToText 写入时自动拆分文件以获得最佳性能。如果只需要 1 个文件,可以显式添加参数 num_shards = 1

num_shards (int) – The number of files (shards) used for output. If not set, the service will decide on the optimal number of shards. Constraining the number of shards is likely to reduce the performance of a pipeline. Setting this value is not recommended unless you require a specific number of output files.

您的文字应如下所示:

(ptransform
    | "events:out" >> beam.io.WriteToText(
            result_output,num_shards=1
        )
     )