为什么我的 Apache Storm 2.0 拓扑在 30 秒后重新启动?

Why is my Apache Storm 2.0 topology restarted after 30s?

我试过几个配置参数,甚至使用 withLocalModeOverride 都没有成功。我在这里缺少什么?

这是一个示例应用程序,30 秒后计数器重置,一切重新开始。如果我可以提供更多详细信息,请告诉我。

用法:

mvn package
java -jar target/Test-0.1.0.jar

src/main/java/Test.java:

package storm.test;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

class Test {
    private static class Spout extends BaseRichSpout {
        private SpoutOutputCollector spoutOutputCollector;
        private long n;

        @Override
        public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.spoutOutputCollector = spoutOutputCollector;
        }

        @Override
        public void nextTuple() {
            LOG.error("InfiniteSpout::nextTuple {}", n);
            spoutOutputCollector.emit(new Values(n++));
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields("x"));
        }
    }

    private static class Bolt extends BaseRichBolt {
        @Override
        public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {

        }

        @Override
        public void execute(Tuple tuple) {
            Long x = tuple.getLongByField("x");
            LOG.error("Bolt::execute {}", x);
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

        }
    }

    private static final Logger LOG = LoggerFactory.getLogger(Test.class);

    public static void main(String[] args) {
        try {
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("spout", new Spout());
            builder.setBolt("bolt", new Bolt()).shuffleGrouping("spout");

            Config conf = new Config();
            LocalCluster cluster = new LocalCluster();
            StormTopology topology = builder.createTopology();
            cluster.submitTopology("test", conf, topology);
        } catch (Exception e) {
            LOG.error(e.getMessage());
        }
    }
}

pom.xml:

<project>
    <modelVersion>4.0.0</modelVersion>

    <groupId>Test</groupId>
    <artifactId>Test</artifactId>
    <version>0.1.0</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.storm</groupId>
            <artifactId>storm-core</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.1.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass>Test</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

这是实际问题:

这是https://issues.apache.org/jira/browse/STORM-3501。此问题仅影响 LocalClusters 中的 运行ning,并且仅当您生成的 jar 中没有资源目录时才会出现。

您可以通过向 jar 添加资源目录来解决此问题。使用您的 pom,您想要添加一个 src/main/resources/resources 目录。

关于 运行 单节点 Storm 时需要考虑的事项,我认为您应该认真考虑一下 Storm 是否适合您的用例。 Storm 相当复杂,其中大部分复杂性是因为我们希望它能够在许多物理机器上分布计算。如果你打算 运行 在一台机器上进行所有计算,那么使用 Storm 可能不会真正获得太多好处,例如只是编写一个常规的 Java 应用程序,或者使用 Apache Camel 之类的东西。

运行单节点时需要注意的其他事项:

  • Storm 是快速失败的,因此如果您遇到任何错误,整个工作程序都会崩溃。当您 运行 在一台机器上运行时,您可能会关闭集群的很大一部分(默认情况下每台机器有 4 个工作人员,因此只要发生错误,您就会丢失四分之一的状态)。

  • 不要将 LocalCluster 用于生产工作负载,它不是为此而设计的。设置一个真正的 Storm 安装,然后 运行 在一台机器上。

这里有一些我突然想到的东西,也许其中一些会有所帮助:

  • 您需要在 cluster.submitTopology 之后添加一个睡眠,否则您的程序应该立即退出。该调用不会阻塞,它只是将拓扑提交给 LocalCluster,然后 returns。当您的 main 方法退出时,LocalCluster 也可能会关闭。在 "real" 设置中,您将提交给 运行 作为单独进程的集群,因此这不是问题,但是在使用 LocalCluster 时,您需要让主线程等待直到你想关闭程序。

  • 以防万一您最终在测试中使用了类似的代码,您应该记得在完成后关闭 LocalCluster。它是自动关闭的,所以你可以试试看。

  • 在螺栓中确认元组是一种很好的做法。如果您只想在使用元组完成 bolt 时进行确认,请考虑扩展 BaseBasicBolt。