Apache Beam Streaming 从 Pub/Sub 到 ElasticSearch
Apache Beam Streaming from Pub/Sub to ElasticSearch
我正在使用 Apache Beam 编写一个 java 流媒体管道,它从 Google Cloud PubSub 读取消息并将它们写入 ElasticSearch 实例。目前,我正在使用直接运行器,但计划是在 Google Cloud Dataflow 上部署解决方案。
首先,我编写了一个从 PubSub 读取并写入文本文件的管道,它可以工作。然后,我启动了 ElasticSearch 实例,这也有效。我用curl写了一些文档,很简单。
然后,当我尝试使用 Beam 的 ElasticSearch 连接器执行写入时,我开始遇到一些错误。实际上,尽管我添加了对 pom.xml 文件的依赖,但我得到了 ava.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest
。
我所做的基本上是这样的:
messages.apply(
"TwoMinWindow",
Window.into(FixedWindows.of(new Duration(120*1000)))
).apply(
"ElasticWrite",
ElasticsearchIO.write()
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration
.create(new String[]{"http://xxx.xxx.xxx.xxx:9200"}, "streaming_data", "string")
.withUsername("xxxx")
.withPassword("xxxxxxxx")
)
);
使用 DirectRunner,我能够连接到 PubSub,但是当管道尝试连接 ElasticSearch 实例时出现异常:
java.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest(Ljava/lang/String;Ljava/lang/String;[Lorg/apache/http/Header;)Lorg/elasticsearch/client/Response;
at org.apache.beam.sdk.util.UserCodeException.wrap (UserCodeException.java:34)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup (Unknown Source)
at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor (DoFnInvokers.java:50)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load (DoFnLifecycleManager.java:104)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load (DoFnLifecycleManager.java:91)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture (LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync (LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad (LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get (LocalCache.java:2044)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get (LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad (LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get (LocalCache.java:4958)
at org.apache.beam.runners.direct.DoFnLifecycleManager.get (DoFnLifecycleManager.java:61)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator (ParDoEvaluatorFactory.java:129)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication (ParDoEvaluatorFactory.java:79)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication (TransformEvaluatorRegistry.java:169)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest(Ljava/lang/String;Ljava/lang/String;[Lorg/apache/http/Header;)Lorg/elasticsearch/client/Response;
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion (ElasticsearchIO.java:1348)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup (ElasticsearchIO.java:1200)
我在pom.xml中添加的是:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elastic.version}</version>
</dependency>
我被这个问题困住了,不知道如何解决。如果我使用 JestClient,我可以毫无问题地连接到 ElasticSearch。
你有什么建议吗?
您正在使用 RestClient
的较新版本,该版本没有方法 performRequest(String, Header)
。如果你看the latest source code, you can see that the method takes a Request
now, whereas in older versions there were methods that took Strings and Headers。
这些方法已弃用,然后 removed from the code on September 1, 2018.
要么更改您的代码以使用较新的 Elastic Search 库,要么指定与您的代码兼容的旧版本库(它需要在 7.0.x
之前,例如 6.8.4
) .
我正在使用 Apache Beam 编写一个 java 流媒体管道,它从 Google Cloud PubSub 读取消息并将它们写入 ElasticSearch 实例。目前,我正在使用直接运行器,但计划是在 Google Cloud Dataflow 上部署解决方案。
首先,我编写了一个从 PubSub 读取并写入文本文件的管道,它可以工作。然后,我启动了 ElasticSearch 实例,这也有效。我用curl写了一些文档,很简单。
然后,当我尝试使用 Beam 的 ElasticSearch 连接器执行写入时,我开始遇到一些错误。实际上,尽管我添加了对 pom.xml 文件的依赖,但我得到了 ava.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest
。
我所做的基本上是这样的:
messages.apply(
"TwoMinWindow",
Window.into(FixedWindows.of(new Duration(120*1000)))
).apply(
"ElasticWrite",
ElasticsearchIO.write()
.withConnectionConfiguration(
ElasticsearchIO.ConnectionConfiguration
.create(new String[]{"http://xxx.xxx.xxx.xxx:9200"}, "streaming_data", "string")
.withUsername("xxxx")
.withPassword("xxxxxxxx")
)
);
使用 DirectRunner,我能够连接到 PubSub,但是当管道尝试连接 ElasticSearch 实例时出现异常:
java.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest(Ljava/lang/String;Ljava/lang/String;[Lorg/apache/http/Header;)Lorg/elasticsearch/client/Response;
at org.apache.beam.sdk.util.UserCodeException.wrap (UserCodeException.java:34)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn$DoFnInvoker.invokeSetup (Unknown Source)
at org.apache.beam.sdk.transforms.reflect.DoFnInvokers.tryInvokeSetupFor (DoFnInvokers.java:50)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load (DoFnLifecycleManager.java:104)
at org.apache.beam.runners.direct.DoFnLifecycleManager$DeserializingCacheLoader.load (DoFnLifecycleManager.java:91)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture (LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync (LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad (LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get (LocalCache.java:2044)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get (LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getOrLoad (LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get (LocalCache.java:4958)
at org.apache.beam.runners.direct.DoFnLifecycleManager.get (DoFnLifecycleManager.java:61)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.createEvaluator (ParDoEvaluatorFactory.java:129)
at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication (ParDoEvaluatorFactory.java:79)
at org.apache.beam.runners.direct.TransformEvaluatorRegistry.forApplication (TransformEvaluatorRegistry.java:169)
at org.apache.beam.runners.direct.DirectTransformExecutor.run (DirectTransformExecutor.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
at java.util.concurrent.FutureTask.run (FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:624)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.client.RestClient.performRequest(Ljava/lang/String;Ljava/lang/String;[Lorg/apache/http/Header;)Lorg/elasticsearch/client/Response;
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.getBackendVersion (ElasticsearchIO.java:1348)
at org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO$Write$WriteFn.setup (ElasticsearchIO.java:1200)
我在pom.xml中添加的是:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch.client/elasticsearch-rest-client -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elastic.version}</version>
</dependency>
我被这个问题困住了,不知道如何解决。如果我使用 JestClient,我可以毫无问题地连接到 ElasticSearch。
你有什么建议吗?
您正在使用 RestClient
的较新版本,该版本没有方法 performRequest(String, Header)
。如果你看the latest source code, you can see that the method takes a Request
now, whereas in older versions there were methods that took Strings and Headers。
这些方法已弃用,然后 removed from the code on September 1, 2018.
要么更改您的代码以使用较新的 Elastic Search 库,要么指定与您的代码兼容的旧版本库(它需要在 7.0.x
之前,例如 6.8.4
) .