Google Dataflow 在 BigQuery 中写入多行
Google Dataflow write multiple line in BigQuery
我有一个简单的流程,目的是在一个 BigQuery 中写两行 Table。
我使用 DynamicDestinations 因为在那之后我将写多个 Table,在那个例子中它是相同的 table...
问题是我的 BigQuery table 最后只有 1 行。
它堆栈跟踪我在第二个 insert
上看到以下错误
”
状态: {
代码:6
消息:"Already Exists: Job sampleprojet3:b9912b9b05794aec8f4292b2ae493612_eeb0082ade6f4a58a14753d1cc92ddbc_00001-0"
}
“
这是什么意思?
与此限制有关吗?
https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/550
我怎样才能完成这项工作?
我用 BeamSDK 2.0.0,我试过 2.1.0(结果相同)
我的启动方式:
mvn compile exec:java -Dexec.mainClass=fr.gireve.dataflow.LogsFlowBug -Dexec.args="--runner=DataflowRunner --inputDir=gs://sampleprojet3.appspot.com / --project=sampleprojet3 --stagingLocation=gs://dataflow-sampleprojet3/tmp" -Pdataflow-runner
Pipeline p = Pipeline.create(options);
final List<String> tableNameTableValue = Arrays.asList("table1:value1", "table1:value2", "table2:value1", "table2:value2");
p.apply(Create.of(tableNameTableValue)).setCoder(StringUtf8Coder.of())
.apply(BigQueryIO.<String>write()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.to(new DynamicDestinations<String, KV<String, String>>() {
@Override
public KV<String, String> getDestination(ValueInSingleWindow<String> element) {
final String[] split = element.getValue().split(":");
return KV.of(split[0], split[1]) ;
}
@Override
public Coder<KV<String, String>> getDestinationCoder() {
return KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
}
@Override
public TableDestination getTable(KV<String, String> row) {
String tableName = row.getKey();
String tableSpec = "sampleprojet3:testLoadJSON." + tableName;
return new TableDestination(tableSpec, "Table " + tableName);
}
@Override
public TableSchema getSchema(KV<String, String> row) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("myColumn").setType("STRING"));
TableSchema ts = new TableSchema();
ts.setFields(fields);
return ts;
}
})
.withFormatFunction(new SerializableFunction<String, TableRow>() {
public TableRow apply(String row) {
TableRow tr = new TableRow();
tr.set("myColumn", row);
return tr;
}
}));
p.run().waitUntilFinish();
谢谢
DynamicDestinations
将每个元素与 destination 相关联 - 即元素应该去的地方。元素根据其目的地路由到 BigQuery tables:1 destination = 1 BigQuery table with a schema: destination should include just enough information to produce a TableDestination
和一个架构。具有相同目的地的元素转到相同的 table,具有不同目的地的元素转到不同的 table。
您的代码片段使用 DynamicDestinations
,目标类型包含 元素和 table,这是不必要的,当然也违反了上面的约束:具有不同目的地的元素最终会到达相同的 table:例如KV("table1", "value1")
和 KV("table1", "value2")
是不同的目的地,但您的 getTable
将它们映射到相同的 table table1
.
您需要从目标类型中删除该元素。这也将导致更简单的代码。作为旁注,我认为您不需要覆盖 getDestinationCoder()
- 它可以自动推断出来。
试试这个:
.to(new DynamicDestinations<String, String>() {
@Override
public String getDestination(ValueInSingleWindow<String> element) {
return element.getValue().split(":")[0];
}
@Override
public TableDestination getTable(String tableName) {
return new TableDestination(
"sampleprojet3:testLoadJSON." + tableName, "Table " + tableName);
}
@Override
public TableSchema getSchema(String tableName) {
List<TableFieldSchema> fields = Arrays.asList(
new TableFieldSchema().setName("myColumn").setType("STRING"));
return new TableSchema().setFields(fields);
}
})
我有一个简单的流程,目的是在一个 BigQuery 中写两行 Table。 我使用 DynamicDestinations 因为在那之后我将写多个 Table,在那个例子中它是相同的 table... 问题是我的 BigQuery table 最后只有 1 行。 它堆栈跟踪我在第二个 insert
上看到以下错误”
状态: {
代码:6
消息:"Already Exists: Job sampleprojet3:b9912b9b05794aec8f4292b2ae493612_eeb0082ade6f4a58a14753d1cc92ddbc_00001-0"
}
“
这是什么意思? 与此限制有关吗? https://github.com/GoogleCloudPlatform/DataflowJavaSDK/issues/550 我怎样才能完成这项工作?
我用 BeamSDK 2.0.0,我试过 2.1.0(结果相同)
我的启动方式:
mvn compile exec:java -Dexec.mainClass=fr.gireve.dataflow.LogsFlowBug -Dexec.args="--runner=DataflowRunner --inputDir=gs://sampleprojet3.appspot.com / --project=sampleprojet3 --stagingLocation=gs://dataflow-sampleprojet3/tmp" -Pdataflow-runner
Pipeline p = Pipeline.create(options);
final List<String> tableNameTableValue = Arrays.asList("table1:value1", "table1:value2", "table2:value1", "table2:value2");
p.apply(Create.of(tableNameTableValue)).setCoder(StringUtf8Coder.of())
.apply(BigQueryIO.<String>write()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.to(new DynamicDestinations<String, KV<String, String>>() {
@Override
public KV<String, String> getDestination(ValueInSingleWindow<String> element) {
final String[] split = element.getValue().split(":");
return KV.of(split[0], split[1]) ;
}
@Override
public Coder<KV<String, String>> getDestinationCoder() {
return KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
}
@Override
public TableDestination getTable(KV<String, String> row) {
String tableName = row.getKey();
String tableSpec = "sampleprojet3:testLoadJSON." + tableName;
return new TableDestination(tableSpec, "Table " + tableName);
}
@Override
public TableSchema getSchema(KV<String, String> row) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("myColumn").setType("STRING"));
TableSchema ts = new TableSchema();
ts.setFields(fields);
return ts;
}
})
.withFormatFunction(new SerializableFunction<String, TableRow>() {
public TableRow apply(String row) {
TableRow tr = new TableRow();
tr.set("myColumn", row);
return tr;
}
}));
p.run().waitUntilFinish();
谢谢
DynamicDestinations
将每个元素与 destination 相关联 - 即元素应该去的地方。元素根据其目的地路由到 BigQuery tables:1 destination = 1 BigQuery table with a schema: destination should include just enough information to produce a TableDestination
和一个架构。具有相同目的地的元素转到相同的 table,具有不同目的地的元素转到不同的 table。
您的代码片段使用 DynamicDestinations
,目标类型包含 元素和 table,这是不必要的,当然也违反了上面的约束:具有不同目的地的元素最终会到达相同的 table:例如KV("table1", "value1")
和 KV("table1", "value2")
是不同的目的地,但您的 getTable
将它们映射到相同的 table table1
.
您需要从目标类型中删除该元素。这也将导致更简单的代码。作为旁注,我认为您不需要覆盖 getDestinationCoder()
- 它可以自动推断出来。
试试这个:
.to(new DynamicDestinations<String, String>() {
@Override
public String getDestination(ValueInSingleWindow<String> element) {
return element.getValue().split(":")[0];
}
@Override
public TableDestination getTable(String tableName) {
return new TableDestination(
"sampleprojet3:testLoadJSON." + tableName, "Table " + tableName);
}
@Override
public TableSchema getSchema(String tableName) {
List<TableFieldSchema> fields = Arrays.asList(
new TableFieldSchema().setName("myColumn").setType("STRING"));
return new TableSchema().setFields(fields);
}
})