价值提供者问题
ValueProvider Issue
我正在尝试获取从云函数传递到数据流模板的 属性 的值。我收到错误是因为传递的值是包装器,并且在编译期间使用 .get()
方法失败。有这个错误
An exception occurred while executing the Java class. null: InvocationTargetException: Not called from a runtime context.
public interface MyOptions extends DataflowPipelineOptions {
...
@Description("schema of csv file")
ValueProvider<String> getHeader();
void setHeader(ValueProvider<String> header);
...
}
public static void main(String[] args) throws IOException {
...
List<String> sideInputColumns = Arrays.asList(options.getHeader().get().split(","));
...
//ultimately use the getHeaders as side inputs
PCollection<String> input = p.apply(Create.of(sideInputColumns));
final PCollectionView<List<String>> finalColumnView = input.apply(View.asList());
}
如何从 ValueProvider 类型中提取值?
a ValueProvider
的值在管道构造期间不可用。因此,您需要组织管道,使其始终具有相同的结构,并序列化 ValueProvider
。在运行时,管道中的各个转换可以检查值以确定如何操作。
根据您的示例,您可能需要执行如下操作。它创建一个元素,然后使用在运行时计算的 DoFn
来扩展 headers:
public static class HeaderDoFn extends DoFn<String, String> {
private final ValueProvider<String> header;
public HeaderDoFn(ValueProvider<String> header) {
this.header = header;
}
@ProcessElement
public void processElement(ProcessContext c) {
// Ignore input element -- there should be exactly one
for (String column : this.header().get().split(",")) {
c.output(column);
}
}
}
public static void main(String[] args) throws IOException {
PCollection<String> input = p
.apply(Create.of("one")) // create a single element
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
}
});
// Note that the order of this list is not guaranteed.
final PCollectionView<List<String>> finalColumnView =
input.apply(View.asList());
}
另一种选择是使用 NestedValueProvider 从选项创建 ValueProvider<List<String>>
,并将该 ValueProvider<List<String>>
传递给必要的 DoFn
,而不是使用辅助输入。
我正在尝试获取从云函数传递到数据流模板的 属性 的值。我收到错误是因为传递的值是包装器,并且在编译期间使用 .get()
方法失败。有这个错误
An exception occurred while executing the Java class. null: InvocationTargetException: Not called from a runtime context.
public interface MyOptions extends DataflowPipelineOptions {
...
@Description("schema of csv file")
ValueProvider<String> getHeader();
void setHeader(ValueProvider<String> header);
...
}
public static void main(String[] args) throws IOException {
...
List<String> sideInputColumns = Arrays.asList(options.getHeader().get().split(","));
...
//ultimately use the getHeaders as side inputs
PCollection<String> input = p.apply(Create.of(sideInputColumns));
final PCollectionView<List<String>> finalColumnView = input.apply(View.asList());
}
如何从 ValueProvider 类型中提取值?
a ValueProvider
的值在管道构造期间不可用。因此,您需要组织管道,使其始终具有相同的结构,并序列化 ValueProvider
。在运行时,管道中的各个转换可以检查值以确定如何操作。
根据您的示例,您可能需要执行如下操作。它创建一个元素,然后使用在运行时计算的 DoFn
来扩展 headers:
public static class HeaderDoFn extends DoFn<String, String> {
private final ValueProvider<String> header;
public HeaderDoFn(ValueProvider<String> header) {
this.header = header;
}
@ProcessElement
public void processElement(ProcessContext c) {
// Ignore input element -- there should be exactly one
for (String column : this.header().get().split(",")) {
c.output(column);
}
}
}
public static void main(String[] args) throws IOException {
PCollection<String> input = p
.apply(Create.of("one")) // create a single element
.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
}
});
// Note that the order of this list is not guaranteed.
final PCollectionView<List<String>> finalColumnView =
input.apply(View.asList());
}
另一种选择是使用 NestedValueProvider 从选项创建 ValueProvider<List<String>>
,并将该 ValueProvider<List<String>>
传递给必要的 DoFn
,而不是使用辅助输入。