我可以在 Apache Beam 2.0+ 中使用 setWorkerCacheMb 吗?
Can I use setWorkerCacheMb in Apache Beam 2.0+?
我的 Dataflow 作业(使用 Java SDK 2.1.0)非常慢,仅处理 50GB 就需要一天多的时间。我只是从 BigQuery (50GB) 中提取了整个 table,并从 GCS (100+MB) 中加入了一个 csv 文件。
https://cloud.google.com/dataflow/model/group-by-key
我使用 sideInputs 执行连接(上面文档中的后一种方式),虽然我认为使用 CoGroupByKey 更有效,但我不确定这是我的工作超慢的唯一原因。
我用谷歌搜索了一下,默认情况下,sideinputs 的缓存设置为 100MB,我假设我的缓存略微超过了这个限制,然后每个工作人员不断地重新读取 sideinputs。为了改进它,我想我可以使用 setWorkerCacheMb
方法来增加缓存大小。
但是看起来 DataflowPipelineOptions
没有这个方法并且 DataflowWorkerHarnessOptions
被隐藏了。在 -Dexec.args
中传递 --workerCacheMb=200
会导致
An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]
如何使用此选项?谢谢。
我的管道:
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> rows = p.apply("Read from BigQuery",
BigQueryIO.read().from("project:MYDATA.events"));
// Read account file
PCollection<String> accounts = p.apply("Read from account file",
TextIO.read().from("gs://my-bucket/accounts.csv")
.withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
ParDo.of(new DoFn<String, TableRow>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String line = c.element();
CSVParser csvParser = new CSVParser();
String[] fields = csvParser.parseLine(line);
TableRow row = new TableRow();
row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
c.output(row);
}
}));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
String uid = (String) row.get("account_uid");
c.output(KV.of(uid, row));
}
}));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());
// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
ParDo.of(new DoFn<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
if (row.containsKey("account_uid") && row.get("account_uid") != null) {
String uid = (String) row.get("account_uid");
TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
if (accRow == null) {
LOG.warn("accRow null, {}", row.toPrettyString());
} else {
row = row.set("account_id", accRow.get("account_id"));
}
}
c.output(row);
}
}).withSideInputs(uidAccountView));
// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
.to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
StaticValueProvider.of("deadletter_bucket")))
.withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableRow apply(TableRow row) {
return row;
}
}).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run();
历史上我的系统有两个用户标识符,新的 (account_id) 和旧的 (account_uid)。现在我需要向存储在 BigQuery 中的事件数据追溯添加新的 account_id,因为旧数据只有旧的 account_uid。帐户 table(在 account_uid 和 account_id 之间有关系)已经转换为 csv 并存储在 GCS 中。
最后一个函数TableRefPartition
只是根据每个事件时间戳将数据存储到BQ对应的分区中。工作仍然是 运行 (2017-10-30_22_45_59-18169851018279768913) 和瓶颈看起来加入 account_id 部分。
那部分吞吐量 (xxx elements/s) 根据图表上下波动。根据图表,sideInputs 的估计大小为 106MB。
如果切换到 CoGroupByKey 可以显着提高性能,我会这样做。我只是懒惰,认为使用 sideInputs 更容易处理没有帐户信息的事件数据。
您可以采取一些措施来提高代码的性能:
- 您的辅助输入是
Map<String, TableRow>
,但您只使用了 TableRow
- accRow.get("account_id")
中的一个字段。将它设为 Map<String, String>
怎么样,让值成为 account_id
本身?这可能比笨重的 TableRow
对象更有效。
- 您可以将辅助输入的值提取到
DoFn
中的延迟初始化成员变量中,以避免重复调用 .sideInput()
。
也就是说,这种表现是出乎意料的,我们正在调查是否还有其他问题。
尝试以下方法之一:
1) 使用一些代码设置选项:
options.as(DataflowWorkerHarnessOptions.class).setWorkerCacheMb(500);
2) 让您的应用程序在 PipelineOptionsFactory
中注册 DataflowWorkerHarnessOptions
3) 有自己的选择 class 扩展 DataflowWorkerHarnessOptions
我的 Dataflow 作业(使用 Java SDK 2.1.0)非常慢,仅处理 50GB 就需要一天多的时间。我只是从 BigQuery (50GB) 中提取了整个 table,并从 GCS (100+MB) 中加入了一个 csv 文件。
https://cloud.google.com/dataflow/model/group-by-key
我使用 sideInputs 执行连接(上面文档中的后一种方式),虽然我认为使用 CoGroupByKey 更有效,但我不确定这是我的工作超慢的唯一原因。
我用谷歌搜索了一下,默认情况下,sideinputs 的缓存设置为 100MB,我假设我的缓存略微超过了这个限制,然后每个工作人员不断地重新读取 sideinputs。为了改进它,我想我可以使用 setWorkerCacheMb
方法来增加缓存大小。
但是看起来 DataflowPipelineOptions
没有这个方法并且 DataflowWorkerHarnessOptions
被隐藏了。在 -Dexec.args
中传递 --workerCacheMb=200
会导致
An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]
如何使用此选项?谢谢。
我的管道:
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<TableRow> rows = p.apply("Read from BigQuery",
BigQueryIO.read().from("project:MYDATA.events"));
// Read account file
PCollection<String> accounts = p.apply("Read from account file",
TextIO.read().from("gs://my-bucket/accounts.csv")
.withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
ParDo.of(new DoFn<String, TableRow>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String line = c.element();
CSVParser csvParser = new CSVParser();
String[] fields = csvParser.parseLine(line);
TableRow row = new TableRow();
row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
c.output(row);
}
}));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
String uid = (String) row.get("account_uid");
c.output(KV.of(uid, row));
}
}));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());
// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
ParDo.of(new DoFn<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
if (row.containsKey("account_uid") && row.get("account_uid") != null) {
String uid = (String) row.get("account_uid");
TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
if (accRow == null) {
LOG.warn("accRow null, {}", row.toPrettyString());
} else {
row = row.set("account_id", accRow.get("account_id"));
}
}
c.output(row);
}
}).withSideInputs(uidAccountView));
// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
.to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
StaticValueProvider.of("deadletter_bucket")))
.withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableRow apply(TableRow row) {
return row;
}
}).withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run();
历史上我的系统有两个用户标识符,新的 (account_id) 和旧的 (account_uid)。现在我需要向存储在 BigQuery 中的事件数据追溯添加新的 account_id,因为旧数据只有旧的 account_uid。帐户 table(在 account_uid 和 account_id 之间有关系)已经转换为 csv 并存储在 GCS 中。
最后一个函数TableRefPartition
只是根据每个事件时间戳将数据存储到BQ对应的分区中。工作仍然是 运行 (2017-10-30_22_45_59-18169851018279768913) 和瓶颈看起来加入 account_id 部分。
那部分吞吐量 (xxx elements/s) 根据图表上下波动。根据图表,sideInputs 的估计大小为 106MB。
如果切换到 CoGroupByKey 可以显着提高性能,我会这样做。我只是懒惰,认为使用 sideInputs 更容易处理没有帐户信息的事件数据。
您可以采取一些措施来提高代码的性能:
- 您的辅助输入是
Map<String, TableRow>
,但您只使用了TableRow
-accRow.get("account_id")
中的一个字段。将它设为Map<String, String>
怎么样,让值成为account_id
本身?这可能比笨重的TableRow
对象更有效。 - 您可以将辅助输入的值提取到
DoFn
中的延迟初始化成员变量中,以避免重复调用.sideInput()
。
也就是说,这种表现是出乎意料的,我们正在调查是否还有其他问题。
尝试以下方法之一:
1) 使用一些代码设置选项:
options.as(DataflowWorkerHarnessOptions.class).setWorkerCacheMb(500);
2) 让您的应用程序在 PipelineOptionsFactory
DataflowWorkerHarnessOptions
3) 有自己的选择 class 扩展 DataflowWorkerHarnessOptions