flink-kafka-elasticsearch 部署在 yarn 上

flink-kafka-elasticsearch deploy on yarn

独立模式没有问题。但是yarn模式,是不行的。我在 flink classpath(flink/lib) 中添加了 hadoop jar。我看到一些关于 jira 的讨论,比如

在flink 1.2中解决了吗?

flink version:1.9.1

hadoop/yarn version:2.6.0

elasticsearch version:6.8.1

maven 依赖项

<properties>
        <compiler.version>1.8</compiler.version>
        <flink.version>1.9.0</flink.version>
        <java.version>1.8</java.version>
        <log4j.version>2.6.2</log4j.version>

        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

    </properties>

<dependencies>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- flink steam api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-api</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>6.8.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch</groupId>
                    <artifactId>elasticsearch</artifactId>
                </exclusion>

            </exclusions>
        </dependency>


        <!-- log -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>


    </dependencies>

堆栈跟踪:

2020-05-21 16:39:01,918 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Unhandled exception.
org.apache.flink.client.program.ProgramInvocationException: The program caused an error: 
 at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
 at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
 at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
 at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync(JarRunHandler.java:142)
 at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchFieldError: INSTANCE
 at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:53)
 at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<init>(DefaultHttpRequestWriterFactory.java:57)
 at org.apache.http.impl.nio.codecs.DefaultHttpRequestWriterFactory.<clinit>(DefaultHttpRequestWriterFactory.java:47)
 at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:75)
 at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<init>(ManagedNHttpClientConnectionFactory.java:83)
 at org.apache.http.impl.nio.conn.ManagedNHttpClientConnectionFactory.<clinit>(ManagedNHttpClientConnectionFactory.java:64)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager$InternalConnectionFactory.<init>(PoolingNHttpClientConnectionManager.java:553)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:163)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:147)
 at org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.<init>(PoolingNHttpClientConnectionManager.java:119)
 at org.apache.http.impl.nio.client.HttpAsyncClientBuilder.build(HttpAsyncClientBuilder.java:668)
 at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:241)
 at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:238)
 at java.security.AccessController.doPrivileged(Native Method)
 at org.elasticsearch.client.RestClientBuilder.createHttpClient(RestClientBuilder.java:238)
 at org.elasticsearch.client.RestClientBuilder.access[=11=]0(RestClientBuilder.java:42)
 at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:209)
 at org.elasticsearch.client.RestClientBuilder.run(RestClientBuilder.java:206)
 at java.security.AccessController.doPrivileged(Native Method)
 at org.elasticsearch.client.RestClientBuilder.build(RestClientBuilder.java:206)
 at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:269)
 at org.elasticsearch.client.RestHighLevelClient.<init>(RestHighLevelClient.java:261)
 at cn.ipaynow.flink.SinkToElasticsearch.<clinit>(SinkToElasticsearch.java:44)
 at cn.ipaynow.Main.main(Main.java:69)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
 at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
 at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
 ... 7 more

我用Flink Elasticsearch connector来解决问题。 我昨天通过实现 RichSinkFunction 做到了。

    <dependencies>
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- flink java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- flink steam api -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>

            <exclusions>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-to-slf4j</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.logging.log4j</groupId>
                    <artifactId>log4j-core</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>org.elasticsearch</groupId>-->
        <!--            <artifactId>elasticsearch</artifactId>-->
        <!--            <version>6.8.1</version>-->
        <!--            <exclusions>-->
        <!--                <exclusion>-->
        <!--                    <groupId>org.apache.logging.log4j</groupId>-->
        <!--                    <artifactId>log4j-api</artifactId>-->
        <!--                </exclusion>-->
        <!--            </exclusions>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>org.elasticsearch.client</groupId>-->
        <!--            <artifactId>elasticsearch-rest-high-level-client</artifactId>-->
        <!--            <version>6.8.1</version>-->
        <!--            <exclusions>-->
        <!--                <exclusion>-->
        <!--                    <groupId>org.elasticsearch</groupId>-->
        <!--                    <artifactId>elasticsearch</artifactId>-->
        <!--                </exclusion>-->

        <!--            </exclusions>-->
        <!--        </dependency>-->


        <!-- log -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.4</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.60</version>
        </dependency>

    </dependencies>