用于 apache-beam java 的 ElasticsearchIO 是否支持模板和 ValueProvider 参数?调用模板时出错

Does ElasticsearchIO for apache-beam java supports Templating and ValueProvider argument? Error While invoking templates

我正在尝试为 Apache Beam 创建一个模板以将数据索引到 elasticsearch。正在创建模板,但在调用模板时管道因无协议错误而失败。它看起来很奇怪,因为错误与 URL 对象有关。

public class StarterPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);

    public interface IndexToEsOptions extends PipelineOptions {
        @Description("Path of the gzip index file to read from")
        ValueProvider<String> getInputIndexFile();
        void setInputIndexFile(ValueProvider<String> value);

        @Description("Index name to index with")
        ValueProvider<String> getIndexName();
        void setIndexName(ValueProvider<String> value);

        @Description("Index template name")
        ValueProvider<String> getIndexTemplate();
        void setIndexTemplate(ValueProvider<String> value);

        @Description("URI for es")
        @Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
        ValueProvider<String> getEsUri();
        void setEsUri(ValueProvider<String> value);
    }

    public static void main(String[] args) {



        IndexToEsOptions options = PipelineOptionsFactory
                .fromArgs(args).
                        withValidation().as(IndexToEsOptions.class);

        Pipeline p = Pipeline.create(options);
        p.apply(TextIO.read().from(options.getInputIndexFile()))
                .apply(ElasticsearchIO.write().withConnectionConfiguration(
                        ElasticsearchIO.ConnectionConfiguration.create(
                                new String[]{options.getEsUri().toString()},
                                options.getIndexName().toString(),
                                options.getIndexTemplate().toString())
                                .withConnectTimeout(240)
                        )
                        .withMaxBatchSizeBytes(15 * 1024 * 1024)
                );

        p.run();
    }

我在 运行 时得到的错误是

java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:194) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:165) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access[=12=]0(IntrinsicMapTaskExecutorFactory.java:86) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:183) ... 14 more Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1475) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271) Caused by: java.net.MalformedURLException: no protocol: RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com} java.base/java.net.URL.(URL.java:627) java.base/java.net.URL.(URL.java:523) java.base/java.net.URL.(URL.java:470) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$ConnectionConfiguration.createClient(ElasticsearchIO.java:417) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1457) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access[=12=]0(IntrinsicMapTaskExecutorFactory.java:86) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:183) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:165) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834)

简单地说,不,看起来 ElasticsearchIO.ConnectionConfiguration 不支持 ValueProvider,至少在当前版本 (2.22.0) 中不支持。您可以通过查看 ConnectionConfiguration.Create:

的签名来了解这一点
public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
                                                             java.lang.String index,
                                                             java.lang.String type)

并将其与 支持 ValueProviders 的函数进行比较,ElasticsearchIO.Read.withQuery:

public ElasticsearchIO.Read withQuery(ValueProvider<java.lang.String> query)

要支持 ValueProvider,函数必须实际接受 ValueProvider 对象。这是因为 ValueProvider 旨在在运行时而不是在管道构造期间传递参数。所以在pipeline构建的时候,处处都要作为ValueProvider对象传递。

在您的示例中,发生的事情是您在您的 ValueProvider 上为 EsUri 调用 toString,而不是生成包含您的 URL 的字符串,您得到您的 ValueProvider 的字符串表示形式如下所示:"RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}。这就是为什么你得到 MalformedURLException。它试图将该字符串读取为失败的 URL。

虽然解决方法很简单,你只需要将EsUri参数从ValueProvider<String>改为String,改为构建时间参数即可。请注意,将其用作构造时间参数意味着每次要更改该参数时都需要重建管道。不幸的是,在添加 ValueProvider 支持之前,您无能为力。