用于 apache-beam java 的 ElasticsearchIO 是否支持模板和 ValueProvider 参数?调用模板时出错
Does ElasticsearchIO for apache-beam java supports Templating and ValueProvider argument? Error While invoking templates
我正在尝试为 Apache Beam 创建一个模板以将数据索引到 elasticsearch。正在创建模板,但在调用模板时管道因无协议错误而失败。它看起来很奇怪,因为错误与 URL 对象有关。
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public interface IndexToEsOptions extends PipelineOptions {
@Description("Path of the gzip index file to read from")
ValueProvider<String> getInputIndexFile();
void setInputIndexFile(ValueProvider<String> value);
@Description("Index name to index with")
ValueProvider<String> getIndexName();
void setIndexName(ValueProvider<String> value);
@Description("Index template name")
ValueProvider<String> getIndexTemplate();
void setIndexTemplate(ValueProvider<String> value);
@Description("URI for es")
@Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
ValueProvider<String> getEsUri();
void setEsUri(ValueProvider<String> value);
}
public static void main(String[] args) {
IndexToEsOptions options = PipelineOptionsFactory
.fromArgs(args).
withValidation().as(IndexToEsOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputIndexFile()))
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
new String[]{options.getEsUri().toString()},
options.getIndexName().toString(),
options.getIndexTemplate().toString())
.withConnectTimeout(240)
)
.withMaxBatchSizeBytes(15 * 1024 * 1024)
);
p.run();
}
我在 运行 时得到的错误是
java.lang.IllegalArgumentException: Cannot get Elasticsearch version
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:194)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:165)
org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834) Caused by:
org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Cannot get Elasticsearch version
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown
Source)
org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80)
org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62)
org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95)
org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access[=12=]0(IntrinsicMapTaskExecutorFactory.java:86)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:183)
... 14 more Caused by: java.lang.IllegalArgumentException: Cannot get
Elasticsearch version
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1475)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271)
Caused by: java.net.MalformedURLException: no protocol:
RuntimeValueProvider{propertyName=esUri,
default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}
java.base/java.net.URL.(URL.java:627)
java.base/java.net.URL.(URL.java:523)
java.base/java.net.URL.(URL.java:470)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$ConnectionConfiguration.createClient(ElasticsearchIO.java:417)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1457)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271)
org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown
Source)
org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80)
org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62)
org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95)
org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access[=12=]0(IntrinsicMapTaskExecutorFactory.java:86)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:183)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:165)
org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63)
org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50)
org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.base/java.lang.Thread.run(Thread.java:834)
简单地说,不,看起来 ElasticsearchIO.ConnectionConfiguration
不支持 ValueProvider,至少在当前版本 (2.22.0) 中不支持。您可以通过查看 ConnectionConfiguration.Create
:
的签名来了解这一点
public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
java.lang.String index,
java.lang.String type)
并将其与 支持 ValueProviders 的函数进行比较,ElasticsearchIO.Read.withQuery
:
public ElasticsearchIO.Read withQuery(ValueProvider<java.lang.String> query)
要支持 ValueProvider,函数必须实际接受 ValueProvider 对象。这是因为 ValueProvider 旨在在运行时而不是在管道构造期间传递参数。所以在pipeline构建的时候,处处都要作为ValueProvider对象传递。
在您的示例中,发生的事情是您在您的 ValueProvider 上为 EsUri
调用 toString
,而不是生成包含您的 URL 的字符串,您得到您的 ValueProvider 的字符串表示形式如下所示:"RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}
。这就是为什么你得到 MalformedURLException
。它试图将该字符串读取为失败的 URL。
虽然解决方法很简单,你只需要将EsUri
参数从ValueProvider<String>
改为String
,改为构建时间参数即可。请注意,将其用作构造时间参数意味着每次要更改该参数时都需要重建管道。不幸的是,在添加 ValueProvider 支持之前,您无能为力。
我正在尝试为 Apache Beam 创建一个模板以将数据索引到 elasticsearch。正在创建模板,但在调用模板时管道因无协议错误而失败。它看起来很奇怪,因为错误与 URL 对象有关。
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public interface IndexToEsOptions extends PipelineOptions {
@Description("Path of the gzip index file to read from")
ValueProvider<String> getInputIndexFile();
void setInputIndexFile(ValueProvider<String> value);
@Description("Index name to index with")
ValueProvider<String> getIndexName();
void setIndexName(ValueProvider<String> value);
@Description("Index template name")
ValueProvider<String> getIndexTemplate();
void setIndexTemplate(ValueProvider<String> value);
@Description("URI for es")
@Default.String("https://vpc-myescore01-5mtib6vgjw7sbhgn3kbnwnluim.us-west-1.es.amazonaws.com")
ValueProvider<String> getEsUri();
void setEsUri(ValueProvider<String> value);
}
public static void main(String[] args) {
IndexToEsOptions options = PipelineOptionsFactory
.fromArgs(args).
withValidation().as(IndexToEsOptions.class);
Pipeline p = Pipeline.create(options);
p.apply(TextIO.read().from(options.getInputIndexFile()))
.apply(ElasticsearchIO.write().withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration.create(
new String[]{options.getEsUri().toString()},
options.getIndexName().toString(),
options.getIndexTemplate().toString())
.withConnectTimeout(240)
)
.withMaxBatchSizeBytes(15 * 1024 * 1024)
);
p.run();
}
我在 运行 时得到的错误是
java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:194) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:165) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access[=12=]0(IntrinsicMapTaskExecutorFactory.java:86) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:183) ... 14 more Caused by: java.lang.IllegalArgumentException: Cannot get Elasticsearch version org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1475) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271) Caused by: java.net.MalformedURLException: no protocol: RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com} java.base/java.net.URL.(URL.java:627) java.base/java.net.URL.(URL.java:523) java.base/java.net.URL.(URL.java:470) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$ConnectionConfiguration.createClient(ElasticsearchIO.java:417) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion(ElasticsearchIO.java:1457) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup(ElasticsearchIO.java:1271) org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup(Unknown Source) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:80) org.apache.beam.runners.dataflow.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:62) org.apache.beam.runners.dataflow.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:95) org.apache.beam.runners.dataflow.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:75) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.createParDoOperation(IntrinsicMapTaskExecutorFactory.java:264) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.access[=12=]0(IntrinsicMapTaskExecutorFactory.java:86) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:183) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.typedApply(IntrinsicMapTaskExecutorFactory.java:165) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:63) org.apache.beam.runners.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:50) org.apache.beam.runners.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:87) org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory.create(IntrinsicMapTaskExecutorFactory.java:125) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:358) org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:311) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120) org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107) java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) java.base/java.lang.Thread.run(Thread.java:834)
简单地说,不,看起来 ElasticsearchIO.ConnectionConfiguration
不支持 ValueProvider,至少在当前版本 (2.22.0) 中不支持。您可以通过查看 ConnectionConfiguration.Create
:
public static ElasticsearchIO.ConnectionConfiguration create(java.lang.String[] addresses,
java.lang.String index,
java.lang.String type)
并将其与 支持 ValueProviders 的函数进行比较,ElasticsearchIO.Read.withQuery
:
public ElasticsearchIO.Read withQuery(ValueProvider<java.lang.String> query)
要支持 ValueProvider,函数必须实际接受 ValueProvider 对象。这是因为 ValueProvider 旨在在运行时而不是在管道构造期间传递参数。所以在pipeline构建的时候,处处都要作为ValueProvider对象传递。
在您的示例中,发生的事情是您在您的 ValueProvider 上为 EsUri
调用 toString
,而不是生成包含您的 URL 的字符串,您得到您的 ValueProvider 的字符串表示形式如下所示:"RuntimeValueProvider{propertyName=esUri, default=https://vpc-esprdcore01-5mtib6vgjw7sbhgn3kbnwnluim.us-east-1.es.amazonaws.com}
。这就是为什么你得到 MalformedURLException
。它试图将该字符串读取为失败的 URL。
虽然解决方法很简单,你只需要将EsUri
参数从ValueProvider<String>
改为String
,改为构建时间参数即可。请注意,将其用作构造时间参数意味着每次要更改该参数时都需要重建管道。不幸的是,在添加 ValueProvider 支持之前,您无能为力。