flink-kafka-elasticsearch 部署在 yarn 上
flink-kafka-elasticsearch deploy on yarn
独立模式没有问题。但是yarn模式,是不行的。我在 flink classpath(flink/lib) 中添加了 hadoop jar。我看到一些关于 jira 的讨论,比如
在flink 1.2中解决了吗?
flink version:1.9.1
hadoop/yarn version:2.6.0
elasticsearch version:6.8.1
maven 依赖项
<properties>
<compiler.version>1.8</compiler.version>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<log4j.version>2.6.2</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink steam api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.1</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- log -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
</dependencies>
堆栈跟踪:
2020-05-21 16:39:01,918 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchFieldError: INSTANCE
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:53)
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:57)
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<clinit>(DefaultHttpRequestWriterFactory.java:47)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:75)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:83)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<clinit>(ManagedNHttpClientConnectionFactory.java:64)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$InternalConnectionFactory.<init>(PoolingNHttpClientConnectionManager.java:553)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:163)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:147)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:119)
at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:668)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:241)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:238)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:238)
at org.elasticsearch.client.RestClientBuilder.access[=11=]0(RestClientBuilder.java:42)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:209)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:206)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:206)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:269)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:261)
at cn.ipaynow.flink.SinkToElasticsearch.<clinit>(SinkToElasticsearch.java:44)
at cn.ipaynow.Main.main(Main.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 7 more
我用Flink Elasticsearch connector
来解决问题。
我昨天通过实现 RichSinkFunction 做到了。
<dependencies>
<!-- kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink steam api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.elasticsearch</groupId>-->
<!-- <artifactId>elasticsearch</artifactId>-->
<!-- <version>6.8.1</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.logging.log4j</groupId>-->
<!-- <artifactId>log4j-api</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.elasticsearch.client</groupId>-->
<!-- <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
<!-- <version>6.8.1</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.elasticsearch</groupId>-->
<!-- <artifactId>elasticsearch</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- log -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
</dependencies>
独立模式没有问题。但是yarn模式,是不行的。我在 flink classpath(flink/lib) 中添加了 hadoop jar。我看到一些关于 jira 的讨论,比如
在flink 1.2中解决了吗?
flink version:1.9.1
hadoop/yarn version:2.6.0
elasticsearch version:6.8.1
maven 依赖项
<properties>
<compiler.version>1.8</compiler.version>
<flink.version>1.9.0</flink.version>
<java.version>1.8</java.version>
<log4j.version>2.6.2</log4j.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink steam api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.8.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.8.1</version>
<exclusions>
<exclusion>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- log -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
</dependencies>
堆栈跟踪:
2020-05-21 16:39:01,918 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync(JarRunHandler.java:142)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchFieldError: INSTANCE
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:53)
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:57)
at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<clinit>(DefaultHttpRequestWriterFactory.java:47)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:75)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:83)
at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<clinit>(ManagedNHttpClientConnectionFactory.java:64)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$InternalConnectionFactory.<init>(PoolingNHttpClientConnectionManager.java:553)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:163)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:147)
at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:119)
at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:668)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:241)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:238)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:238)
at org.elasticsearch.client.RestClientBuilder.access[=11=]0(RestClientBuilder.java:42)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:209)
at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:206)
at java.security.AccessController.doPrivileged(Native Method)
at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:206)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:269)
at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:261)
at cn.ipaynow.flink.SinkToElasticsearch.<clinit>(SinkToElasticsearch.java:44)
at cn.ipaynow.Main.main(Main.java:69)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 7 more
我用Flink Elasticsearch connector
来解决问题。
我昨天通过实现 RichSinkFunction 做到了。
<dependencies>
<!-- kafka -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- flink steam api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.elasticsearch</groupId>-->
<!-- <artifactId>elasticsearch</artifactId>-->
<!-- <version>6.8.1</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.apache.logging.log4j</groupId>-->
<!-- <artifactId>log4j-api</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.elasticsearch.client</groupId>-->
<!-- <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
<!-- <version>6.8.1</version>-->
<!-- <exclusions>-->
<!-- <exclusion>-->
<!-- <groupId>org.elasticsearch</groupId>-->
<!-- <artifactId>elasticsearch</artifactId>-->
<!-- </exclusion>-->
<!-- </exclusions>-->
<!-- </dependency>-->
<!-- log -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.60</version>
</dependency>
</dependencies>