我可以在 Apache Beam 2.0+ 中使用 setWorkerCacheMb 吗?

Can I use setWorkerCacheMb in Apache Beam 2.0+?

我的 Dataflow 作业(使用 Java SDK 2.1.0)非常慢,仅处理 50GB 就需要一天多的时间。我只是从 BigQuery (50GB) 中提取了整个 table,并从 GCS (100+MB) 中加入了一个 csv 文件。

https://cloud.google.com/dataflow/model/group-by-key
我使用 sideInputs 执行连接(上面文档中的后一种方式),虽然我认为使用 CoGroupByKey 更有效,但我不确定这是我的工作超慢的唯一原因。

我用谷歌搜索了一下,默认情况下,sideinputs 的缓存设置为 100MB,我假设我的缓存略微超过了这个限制,然后每个工作人员不断地重新读取 sideinputs。为了改进它,我想我可以使用 setWorkerCacheMb 方法来增加缓存大小。

但是看起来 DataflowPipelineOptions 没有这个方法并且 DataflowWorkerHarnessOptions 被隐藏了。在 -Dexec.args 中传递 --workerCacheMb=200 会导致

An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]

如何使用此选项?谢谢。

我的管道:

MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> rows = p.apply("Read from BigQuery",
        BigQueryIO.read().from("project:MYDATA.events"));

// Read account file
PCollection<String> accounts = p.apply("Read from account file",
        TextIO.read().from("gs://my-bucket/accounts.csv")
                .withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
        ParDo.of(new DoFn<String, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                String line = c.element();
                CSVParser csvParser = new CSVParser();
                String[] fields = csvParser.parseLine(line);

                TableRow row = new TableRow();
                row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
                c.output(row);
            }
        }));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
        ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();
                String uid = (String) row.get("account_uid");
                c.output(KV.of(uid, row));
            }
        }));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());

// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
        ParDo.of(new DoFn<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();

                if (row.containsKey("account_uid") && row.get("account_uid") != null) {
                    String uid = (String) row.get("account_uid");
                    TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
                    if (accRow == null) {
                        LOG.warn("accRow null, {}", row.toPrettyString());
                    } else {
                        row = row.set("account_id", accRow.get("account_id"));
                    }
                }
                c.output(row);
            }
        }).withSideInputs(uidAccountView));

// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
        .to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
                StaticValueProvider.of("deadletter_bucket")))
        .withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @Override
            public TableRow apply(TableRow row) {
                return row;
            }
        }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));

p.run();

历史上我的系统有两个用户标识符,新的 (account_id) 和旧的 (account_uid)。现在我需要向存储在 BigQuery 中的事件数据追溯添加新的 account_id,因为旧数据只有旧的 account_uid。帐户 table(在 account_uid 和 account_id 之间有关系)已经转换为 csv 并存储在 GCS 中。

最后一个函数TableRefPartition只是根据每个事件时间戳将数据存储到BQ对应的分区中。工作仍然是 运行 (2017-10-30_22_45_59-18169851018279768913) 和瓶颈看起来加入 account_id 部分。 那部分吞吐量 (xxx elements/s) 根据图表上下波动。根据图表,sideInputs 的估计大小为 106MB。

如果切换到 CoGroupByKey 可以显着提高性能,我会这样做。我只是懒惰,认为使用 sideInputs 更容易处理没有帐户信息的事件数据。

您可以采取一些措施来提高代码的性能:

  • 您的辅助输入是 Map<String, TableRow>,但您只使用了 TableRow - accRow.get("account_id") 中的一个字段。将它设为 Map<String, String> 怎么样,让值成为 account_id 本身?这可能比笨重的 TableRow 对象更有效。
  • 您可以将辅助输入的值提取到 DoFn 中的延迟初始化成员变量中,以避免重复调用 .sideInput()

也就是说,这种表现是出乎意料的,我们正在调查是否还有其他问题。

尝试以下方法之一:

1) 使用一些代码设置选项:

options.as(DataflowWorkerHarnessOptions.class).setWorkerCacheMb(500);

2) 让您的应用程序在 PipelineOptionsFactory

中注册 DataflowWorkerHarnessOptions

3) 有自己的选择 class 扩展 DataflowWorkerHarnessOptions