DynamicDestinations 中的错误:Apache Beam
Error in DynamicDestinations: Apache Beam
我在执行以下代码时遇到错误:
tableRows2.apply(BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination getTable(TableRow dest) {
List<TableRow> list = sideInput(bqDataView);
String table = list.get(0).get("table").toString();
String tableSpec = "ProjectId:DatasetId."+table;
String tableDescription = "";
return new TableDestination(tableSpec, tableDescription);
}
public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
String str = bqDataView.toString();
return str;
}
@Override
public TableSchema getSchema(TableRow destination) {
List<TableRow> list = sideInput(bqDataView);
String[] schemas = list.get(0).get("schema").toString().split(",");
List<TableFieldSchema> fields = new ArrayList<>();
for(int i=0;i<schemas.length;i++)
{
fields.add(new TableFieldSchema().setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
}
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
@Override
public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
return null;
}
}.getSideInputs(bqDataView)).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
我得到的错误是:
(7dc1af5b557d4d6b): java.lang.IllegalArgumentException: Table reference is not in [project_id]:[dataset_id].[table_id] format: SimplePCollectionView{tag=Tag<org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:403#5f2ef1f005ae0b4>}
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.parseTableSpec(BigQueryHelpers.java:102)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$TableSpecToTableRef.apply(BigQueryHelpers.java:286)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$TableSpecToTableRef.apply(BigQueryHelpers.java:282)
at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantTableDestinations.getDestination(DynamicDestinationsHelpers.java:64)
at org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantTableDestinations.getDestination(DynamicDestinationsHelpers.java:41)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite.processElement(PrepareWrite.java:58)
这是在 Apache Beam 中实现 DynamicDestinations 的正确方法吗?
另外,在 getSideInputs() 方法中总是需要 return String 吗?
getDestination()方法应该写什么?
谢谢。
我认为您错误地向 DynamicDestinations
传递了辅助输入。具体来说,由于您在对 to(...)
的调用中调用 getSideInputs
,因此您使用的是 getSideInputs
的结果——本质上如下:
tableRows2.apply(BigQueryIO.writeTableRows()
.to(bqDataView.toString())
查看 DynamicDestinations
的界面和 BigQueryIO 的测试,您似乎应该做更多类似以下的事情:
final PCollectionView<List<TableRow>> bqDataView = /* ... */;
tableRows2.apply(BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, TableRow>() {
// ...
// Note this method needs to be overridden and use
// the same signature as in DynamicDestinations. Also,
// it should not be invoked as part of the apply.
@Override
public List<PCollectionView<?>> getSideInputs() {
return ImmutableList.of(bqDataView);
}
// ...
}).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
另外,不要使用 DynamicDestinations<TableRow, TableRow>
,第二种类型应该是您可以分组的东西,以识别正在写入的 table。在您的情况下,似乎一切都将到达相同的实际目的地,因此您可以只使用具有固定值的 String
:
new DynamicDestinations<TableRow, String>() {
@Override
public TableDestination getTable(String unusedDest) {
// ...
}
@Override
public TableSchema getSchema(String unusedDest) {
...
}
@Override
public String getDestination(ValueInSingleWindow<TableRow> element) {
return "destination";
}
}
我在执行以下代码时遇到错误:
tableRows2.apply(BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, TableRow>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination getTable(TableRow dest) {
List<TableRow> list = sideInput(bqDataView);
String table = list.get(0).get("table").toString();
String tableSpec = "ProjectId:DatasetId."+table;
String tableDescription = "";
return new TableDestination(tableSpec, tableDescription);
}
public String getSideInputs(PCollectionView<List<TableRow>> bqDataView) {
String str = bqDataView.toString();
return str;
}
@Override
public TableSchema getSchema(TableRow destination) {
List<TableRow> list = sideInput(bqDataView);
String[] schemas = list.get(0).get("schema").toString().split(",");
List<TableFieldSchema> fields = new ArrayList<>();
for(int i=0;i<schemas.length;i++)
{
fields.add(new TableFieldSchema().setName(schemas[i].split(":")[0]).setType(schemas[i].split(":")[1]));
}
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
@Override
public TableRow getDestination(ValueInSingleWindow<TableRow> element) {
return null;
}
}.getSideInputs(bqDataView)).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
我得到的错误是:
(7dc1af5b557d4d6b): java.lang.IllegalArgumentException: Table reference is not in [project_id]:[dataset_id].[table_id] format: SimplePCollectionView{tag=Tag<org.apache.beam.sdk.values.PCollectionViews$SimplePCollectionView.<init>:403#5f2ef1f005ae0b4>}
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.parseTableSpec(BigQueryHelpers.java:102)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$TableSpecToTableRef.apply(BigQueryHelpers.java:286)
at org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers$TableSpecToTableRef.apply(BigQueryHelpers.java:282)
at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131)
at org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantTableDestinations.getDestination(DynamicDestinationsHelpers.java:64)
at org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers$ConstantTableDestinations.getDestination(DynamicDestinationsHelpers.java:41)
at org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite.processElement(PrepareWrite.java:58)
这是在 Apache Beam 中实现 DynamicDestinations 的正确方法吗? 另外,在 getSideInputs() 方法中总是需要 return String 吗? getDestination()方法应该写什么?
谢谢。
我认为您错误地向 DynamicDestinations
传递了辅助输入。具体来说,由于您在对 to(...)
的调用中调用 getSideInputs
,因此您使用的是 getSideInputs
的结果——本质上如下:
tableRows2.apply(BigQueryIO.writeTableRows()
.to(bqDataView.toString())
查看 DynamicDestinations
的界面和 BigQueryIO 的测试,您似乎应该做更多类似以下的事情:
final PCollectionView<List<TableRow>> bqDataView = /* ... */;
tableRows2.apply(BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, TableRow>() {
// ...
// Note this method needs to be overridden and use
// the same signature as in DynamicDestinations. Also,
// it should not be invoked as part of the apply.
@Override
public List<PCollectionView<?>> getSideInputs() {
return ImmutableList.of(bqDataView);
}
// ...
}).withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
另外,不要使用 DynamicDestinations<TableRow, TableRow>
,第二种类型应该是您可以分组的东西,以识别正在写入的 table。在您的情况下,似乎一切都将到达相同的实际目的地,因此您可以只使用具有固定值的 String
:
new DynamicDestinations<TableRow, String>() {
@Override
public TableDestination getTable(String unusedDest) {
// ...
}
@Override
public TableSchema getSchema(String unusedDest) {
...
}
@Override
public String getDestination(ValueInSingleWindow<TableRow> element) {
return "destination";
}
}