无法序列化 com.google.api.services.bigquery.Bigquery$Tables
Unable to serialize com.google.api.services.bigquery.Bigquery$Tables
我正在使用 Bigquery,Tables 发出请求,通过传递初始化的 tableRequest 作为参数从 DoFn 内部获取 bigquery table 的架构,如下所示
private static class FetchSchema extends DoFn<String,List<String>>{
Bigquery.Tables tableRequest;
ValueProvider<String> DestTableName;
ValueProvider<String> mapCols;
ValueProvider<String> recATableName;
public FetchSchema(Bigquery.Tables tableReq,ValueProvider<String> table,ValueProvider<String> mCols,ValueProvider<String> recATab){
this.tableRequest = tableReq;
this.DestTableName = table;
this.mapCols = mCols;
this.recATableName = recATab;
}
private List<String> getTableParams(String tableString) throws IOException{
String[] tableParams = new String[3];
List<String> tableParamsList = new ArrayList<String>();
tableParams[0] = tableString.substring(0,tableString.indexOf(":"));
tableParams[1] = tableString.substring(tableString.indexOf(":")+1,tableString.indexOf("."));
tableParams[2] = tableString.substring(tableString.indexOf("."));
Table table = tableRequest.get(tableParams[0],tableParams[1],tableParams[2]).execute();
List<TableFieldSchema> fields = table.getSchema().getFields();
for(int i = 0; i < fields.size(); i++){
tableParamsList.add(fields.get(i).getName());
tableParamsList.add(fields.get(i).getDescription());
}
return tableParamsList;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException{
String[] mCols = mapCols.get().split(",");
List<String> mapColsList = Arrays.asList(mCols);
c.output(getTableParams(DestTableName.get()));
c.output(getTableParams(recATableName.get()));
c.output(mapColsList);
}
}
但是我得到这个错误:
An exception occured while executing the Java class. null: InvocationTargetException: unable to serialize org.apache.beam.examples.flatFileTest$FetchSchema@6510b00e: com.google.api.services.bigquery.Bigquery$Tables
有什么帮助吗?
在您的本地计算机上创建的 BigQuery 客户端并非对所有用于使用 Dataflow 执行您的管道的工作人员都有用。相反,您应该在 DoFn 的 @StartBundle
方法中创建 BigQuery.Tables
客户端。此方法可以采用 StartBundleContext
参数,允许调用 getPipelineOptions()
.
注意:理想情况下,这可能是 @Setup
方法,因此客户端可以跨包重复使用,但管道选项似乎不可用。
我正在使用 Bigquery,Tables 发出请求,通过传递初始化的 tableRequest 作为参数从 DoFn 内部获取 bigquery table 的架构,如下所示
private static class FetchSchema extends DoFn<String,List<String>>{
Bigquery.Tables tableRequest;
ValueProvider<String> DestTableName;
ValueProvider<String> mapCols;
ValueProvider<String> recATableName;
public FetchSchema(Bigquery.Tables tableReq,ValueProvider<String> table,ValueProvider<String> mCols,ValueProvider<String> recATab){
this.tableRequest = tableReq;
this.DestTableName = table;
this.mapCols = mCols;
this.recATableName = recATab;
}
private List<String> getTableParams(String tableString) throws IOException{
String[] tableParams = new String[3];
List<String> tableParamsList = new ArrayList<String>();
tableParams[0] = tableString.substring(0,tableString.indexOf(":"));
tableParams[1] = tableString.substring(tableString.indexOf(":")+1,tableString.indexOf("."));
tableParams[2] = tableString.substring(tableString.indexOf("."));
Table table = tableRequest.get(tableParams[0],tableParams[1],tableParams[2]).execute();
List<TableFieldSchema> fields = table.getSchema().getFields();
for(int i = 0; i < fields.size(); i++){
tableParamsList.add(fields.get(i).getName());
tableParamsList.add(fields.get(i).getDescription());
}
return tableParamsList;
}
@ProcessElement
public void processElement(ProcessContext c) throws IOException{
String[] mCols = mapCols.get().split(",");
List<String> mapColsList = Arrays.asList(mCols);
c.output(getTableParams(DestTableName.get()));
c.output(getTableParams(recATableName.get()));
c.output(mapColsList);
}
}
但是我得到这个错误:
An exception occured while executing the Java class. null: InvocationTargetException: unable to serialize org.apache.beam.examples.flatFileTest$FetchSchema@6510b00e: com.google.api.services.bigquery.Bigquery$Tables
有什么帮助吗?
在您的本地计算机上创建的 BigQuery 客户端并非对所有用于使用 Dataflow 执行您的管道的工作人员都有用。相反,您应该在 DoFn 的 @StartBundle
方法中创建 BigQuery.Tables
客户端。此方法可以采用 StartBundleContext
参数,允许调用 getPipelineOptions()
.
注意:理想情况下,这可能是 @Setup
方法,因此客户端可以跨包重复使用,但管道选项似乎不可用。