使用 CassandrSink 的 Flink 作业因写入错误而失败

Flink job with CassandrSink fails with Error writing

我有两个简单的 Flink 流作业,它们从 Kafka 读取数据并进行一些转换并将结果放入 Cassandra Sink。他们从不同的 Kafka 主题中读取并保存到不同的 Cassandra 表中。

当我 运行 两个工作中的任何一个单独工作时,一切正常。检查点被触发并完成,数据被保存到 Cassandra。

但是每当我 运行 这两个作业(或其中一个作业两次)时,第二个作业在启动时失败并出现此异常: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing)).

我找不到关于此错误的更多信息,它可能是由以下任一原因引起的:

我也使用可能与第一个冲突的软件包:

最后这是我的集群构建器代码:

ClusterBuilder builder = new ClusterBuilder() {
    @Override
    protected Cluster buildCluster(Cluster.Builder builder) {
        Cluster cluster = null;
        try {
            cluster = builder
                    .addContactPoint("localhost")
                    .withPort(9042)
                    .withClusterName("Test Cluster")
                    .withoutJMXReporting()
                    .withProtocolVersion(ProtocolVersion.V4)
                    .withoutMetrics()
                    .build();

            // register codecs from datastax extras.
            cluster.getConfiguration().getCodecRegistry()
                    .register(LocalTimeCodec.instance);
        } catch (ConfigurationException e) {
            e.printStackTrace();
        } catch (NoHostAvailableException nhae) {
            nhae.printStackTrace();
        }

        return cluster;
    }
};

我尝试了不同的 PoolingOptions 和 SocketOptions 设置,但没有成功。

卡桑德拉水槽:

CassandraSink.addSink(dataRows)
.setQuery("insert into table_name_(16 columns names) " +
        "values (16 placeholders);")
.enableWriteAheadLog()
.setClusterBuilder(builder)
.setFailureHandler(new CassandraFailureHandler() {
    @Override
    public void onFailure(Throwable throwable) {
        LOG.error("A {} occurred.", "Cassandra Failure", throwable);
    }
})
.build()
.setParallelism(1)
.name("Cassandra Sink For Unique Count every N minutes.");

来自 flink 作业管理器的完整跟踪日志:

com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.TransportException: [localhost/127.0.0.1] Error writing))
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:231)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:308)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:250)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.createSession(CassandraSinkBase.java:143)
    at org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:87)
    at org.apache.flink.streaming.connectors.cassandra.AbstractCassandraTupleSink.open(AbstractCassandraTupleSink.java:49)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke[=12=](StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.base/java.lang.Thread.run(Thread.java:834)

感谢任何帮助。

编辑:

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.abcde.ai</groupId>
    <artifactId>analytics-etl</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.10.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>commons-configuration</groupId>
            <artifactId>commons-configuration</artifactId>
            <version>1.10</version>
        </dependency>
        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.abcde.analytics.etl.KafkaUniqueCountsStreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

编辑: 我设法缩小了问题的范围。当我将依赖项 flink-connector-cassandra 标记为提供时,错误得到修复,我只是从本地 Maven 存储库 (~/.m2/repository/org/apache/flink/flink-connector-cassandra_2.11/1.10.2/flink-connector-cassandra_2 复制 jar 文件。 11-1.10.2.jar) 到 Flink lib 文件夹。我的问题解决了,但根本原因仍然是个谜。

我可能是错的,但很可能是netty客户端版本冲突引起的。错误状态为 NoHostAvailableException,但潜在错误是 TransportExceptionError writing 错误消息。 Cassandra 绝对运行良好。

有一种类似的 Whosebug 案例 - Cassandra - error writing,具有非常相似的症状 - 单个项目 运行 以及 AllNodesFailedExceptionTransportExceptionError writing 消息作为根本原因添加一个时。作者通过统一netty客户端才得以解决

在你的情况下,我不确定为什么会有这么多依赖项,所以我会尝试摆脱所有额外的东西和库,只留下 Flink (v 1.10.0-scala_2.12)和 Flink Cassandra 连接器 (flink-connector-cassandra_2.12:jar:1.10.0) 库。它们必须已经包含必要的驱动程序、netty 等。应跳过所有其他驱动程序(至少对于初始迭代以确保这解决了问题及其库冲突)。

为了修复错误,我将依赖项标记为 flink-connector-cassandra,我只是从本地 Maven 存储库 (~/.m2/repository/org/apache/flink/flink-connector-cassandra_2.11/1.10.2/flink-connector-cassandra_2.11-1.10.2.jar) 到 Flink lib 文件夹并重新启动 Flink,这是我的新 pom.xml 文件:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-cassandra_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

我是怎么找到这个的?我正要尝试使用更新的驱动程序版本从源代码编译连接器。首先,我尝试使用未更改的源重现错误。所以我在不做任何更改的情况下编译它,将 jar 放入 Flink lib 文件夹中,万岁!然后我怀疑来自 maven 的罐子有什么不同。我将它复制到 lib 文件夹中,令我惊讶的是它也有效。

我的问题解决了,但根本原因仍然是个谜。

我最后一次尝试是检查是否有任何包与 Cassandra 连接器冲突,所以我 运行 dependency:tree -Dverbose 关于 metrics-coreorg.apache.flink:flink-metrics-dropwizard 有一个冲突:

[INFO] +- org.apache.flink:flink-connector-cassandra_2.12:jar:1.10.0:provided
[INFO] |  +- (io.dropwizard.metrics:metrics-core:jar:3.1.2:provided - omitted for conflict with 3.1.5)
[INFO] |  \- (org.apache.flink:force-shading:jar:1.10.0:provided - omitted for duplicate)

我从我的项目中删除了这个依赖项,但如果连接器未标记为已提供并且也放入 lib 文件夹中,错误仍然存​​在。