风暴,HDFSBolt
Storm, HDFSBolt
我正在尝试在 Storm
中实现 HDFSBolt
。我想从基础的开始,所以使用了 Storm
提供的 TestWordSpout
。
我可以成功编译topology
,但是当我提交它时出现以下错误
8102 [Thread-17-output-executor[3 3]] ERROR o.a.s.util - Async loop died!
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/CanUnbuffer
at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.7.0_95]
.......
.......
这是我的 topology
public class HdfsFileTopology {
public static void main(String[] args) throws Exception {
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
SyncPolicy syncPolicy = new CountSyncPolicy(100);
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(10.0f, Units.KB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/user");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 1);
builder.setBolt("output", bolt, 1).shuffleGrouping("word");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(3);
StormSubmitter.submitTopology("HdfsFileTopology", conf, builder.createTopology());
}
}
这是我的 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-test</artifactId>
<packaging>jar</packaging>
<name>storm-test</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<provided.scope>provided</provided.scope>
</properties>
<profiles>
<profile>
<id>intellij</id>
<properties>
<provided.scope>compile</provided.scope>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert-core</artifactId>
<version>2.0M8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>multilang-javascript</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>multilang-ruby</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>multilang-python</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-metrics</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
<testSourceDirectory>test/jvm</testSourceDirectory>
<resources>
<resource>
<directory>${basedir}/multilang</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.theoryinpractise</groupId>
<artifactId>clojure-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<sourceDirectories>
<sourceDirectory>src/clj</sourceDirectory>
</sourceDirectories>
</configuration>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
</plugins>
当我的拓扑构建在带有版本的风暴库上时,我遇到了很多 NoClassDefFoundError,这与我在集群上得到的不同。确保您使用正确的版本。
听起来像是jar 打包问题。确保您的 jar 包含必需的依赖项(仅在 pom.xml
中包含依赖项并不能保证您的 jar 包含它们)。
我通常使用 maven-jar-plugin
结合 maven-dependency-plugin
来组装罐子。看看这里的例子:https://github.com/mjsax/aeolus/blob/master/queries/lrb/pom.xml
maven-jar-plugin
中的 <includes>
部分列出了其他依赖项。 maven-dependency-plugin
使这些依赖文件可用(例如,commons-lang3
)。
据我所知,maven-shade-plugin
也可以用于 assemble 罐子(阅读文档以获取更多信息)。
我正在尝试在 Storm
中实现 HDFSBolt
。我想从基础的开始,所以使用了 Storm
提供的 TestWordSpout
。
我可以成功编译topology
,但是当我提交它时出现以下错误
8102 [Thread-17-output-executor[3 3]] ERROR o.a.s.util - Async loop died!
java.lang.NoClassDefFoundError: org/apache/hadoop/fs/CanUnbuffer
at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.7.0_95]
.......
.......
这是我的 topology
public class HdfsFileTopology {
public static void main(String[] args) throws Exception {
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter(",");
SyncPolicy syncPolicy = new CountSyncPolicy(100);
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(10.0f, Units.KB);
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/user");
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://localhost:9000")
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 1);
builder.setBolt("output", bolt, 1).shuffleGrouping("word");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(3);
StormSubmitter.submitTopology("HdfsFileTopology", conf, builder.createTopology());
}
}
这是我的 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
<artifactId>storm-test</artifactId>
<packaging>jar</packaging>
<name>storm-test</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<provided.scope>provided</provided.scope>
</properties>
<profiles>
<profile>
<id>intellij</id>
<properties>
<provided.scope>compile</provided.scope>
</properties>
</profile>
</profiles>
<dependencies>
<dependency>
<groupId>org.hdrhistogram</groupId>
<artifactId>HdrHistogram</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.easytesting</groupId>
<artifactId>fest-assert-core</artifactId>
<version>2.0M8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.jmock</groupId>
<artifactId>jmock</artifactId>
<version>2.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.twitter4j</groupId>
<artifactId>twitter4j-stream</artifactId>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<scope>${provided.scope}</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>multilang-javascript</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>multilang-ruby</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>multilang-python</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-metrics</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-hdfs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
<testSourceDirectory>test/jvm</testSourceDirectory>
<resources>
<resource>
<directory>${basedir}/multilang</directory>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass></mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.theoryinpractise</groupId>
<artifactId>clojure-maven-plugin</artifactId>
<extensions>true</extensions>
<configuration>
<sourceDirectories>
<sourceDirectory>src/clj</sourceDirectory>
</sourceDirectories>
</configuration>
<executions>
<execution>
<id>compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>${storm.topology}</mainClass>
</configuration>
</plugin>
</plugins>
当我的拓扑构建在带有版本的风暴库上时,我遇到了很多 NoClassDefFoundError,这与我在集群上得到的不同。确保您使用正确的版本。
听起来像是jar 打包问题。确保您的 jar 包含必需的依赖项(仅在 pom.xml
中包含依赖项并不能保证您的 jar 包含它们)。
我通常使用 maven-jar-plugin
结合 maven-dependency-plugin
来组装罐子。看看这里的例子:https://github.com/mjsax/aeolus/blob/master/queries/lrb/pom.xml
maven-jar-plugin
中的 <includes>
部分列出了其他依赖项。 maven-dependency-plugin
使这些依赖文件可用(例如,commons-lang3
)。
据我所知,maven-shade-plugin
也可以用于 assemble 罐子(阅读文档以获取更多信息)。