价值提供者问题

ValueProvider Issue

我正在尝试获取从云函数传递到数据流模板的 属性 的值。我收到错误是因为传递的值是包装器,并且在编译期间使用 .get() 方法失败。有这个错误 An exception occurred while executing the Java class. null: InvocationTargetException: Not called from a runtime context.

public interface MyOptions extends DataflowPipelineOptions {
...
@Description("schema of csv file")
ValueProvider<String> getHeader();
void setHeader(ValueProvider<String> header);
...
}

public static void main(String[] args) throws IOException {
...
    List<String> sideInputColumns = Arrays.asList(options.getHeader().get().split(","));
...
    //ultimately use the getHeaders as side inputs
    PCollection<String> input = p.apply(Create.of(sideInputColumns));
    final PCollectionView<List<String>> finalColumnView = input.apply(View.asList());
}

如何从 ValueProvider 类型中提取值?

a ValueProvider 的值在管道构造期间不可用。因此,您需要组织管道,使其始终具有相同的结构,并序列化 ValueProvider。在运行时,管道中的各个转换可以检查值以确定如何操作。

根据您的示例,您可能需要执行如下操作。它创建一个元素,然后使用在运行时计算的 DoFn 来扩展 headers:

public static class HeaderDoFn extends DoFn<String, String> {
  private final ValueProvider<String> header;
  public HeaderDoFn(ValueProvider<String> header) {
    this.header = header;
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    // Ignore input element -- there should be exactly one
    for (String column : this.header().get().split(",")) {
      c.output(column);
    }
  }
}

public static void main(String[] args) throws IOException {
  PCollection<String> input = p
    .apply(Create.of("one")) // create a single element
    .apply(ParDo.of(new DoFn<String, String>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
      }
    });

  // Note that the order of this list is not guaranteed. 
  final PCollectionView<List<String>> finalColumnView = 
    input.apply(View.asList());        
}

另一种选择是使用 NestedValueProvider 从选项创建 ValueProvider<List<String>>,并将该 ValueProvider<List<String>> 传递给必要的 DoFn,而不是使用辅助输入。