kafka -> storm -> flink :意外的块数据
kafka -> storm -> flink : unexpected block data
我正在将拓扑从 storm 移动到 flink。拓扑已缩减为 KafkaSpout->Bolt
。 bolt 只是计算数据包,而不是尝试解码它们。
编译后的jar通过flink -c <entry point> <path to .jar>
提交给flink,报如下错误:
java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:190)
at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
... 1 more
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1365)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:175)
... 3 more
我的问题:
- 我是否错过了 w/re KafkaSpout 的配置步骤?这在香草风暴中使用时有效。
- 是否有我需要使用的特定版本的风暴库?我在我的构建中包含了 0.9.4。
- 我可能还漏掉了什么?
我应该使用 storm KafkaSpout 还是使用 flink KafkaSource 自己编写更好?
编辑:
相关代码如下:
拓扑结构:
BrokerHosts brokerHosts = new ZkHosts(configuration.getString("kafka/zookeeper"));
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, configuration.getString("kafka/topic"), "/storm_env_values", "storm_env_DEBUG");
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("environment", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("decode_bytes", new EnvironmentBolt(), 1).shuffleGrouping("environment");
初始化:
FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("env_topology", conf, buildTopology());
螺栓基于BaseRichBolt。 execute() fn 只是记录任何要调试的数据包的存在。那里没有其他代码。
我刚刚看过这个。现在有一个问题,但我让它在本地工作。您可以将此热修复应用到您的代码并自行构建兼容层。
KafkaSpout
注册指标。但是,兼容层目前不支持指标。您需要删除 FlinkTopologyContext.registerMetric(...)
和 return null
中的异常。 (已经有一个 open PR 在做 metrics 的整合,所以我不想把这个 hot fix 推到 master 分支)
- 此外,您需要在查询中手动添加一些配置参数:
我只是在这里编造了一些值:
Config c = new Config();
List<String> zkServers = new ArrayList<String>();
zkServers.add("localhost");
c.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
c.put(Config.STORM_ZOOKEEPER_PORT, 2181);
c.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
c.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
- 您需要向您的项目添加一些额外的依赖项:
除了 flink-storm
你还需要:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
这对我有用,使用 Kafka_2.10-0.8.1.1 和 FlinkLocalCluster
在 Eclipse 中执行。
它也适用于通过 bin/start-local-streaming.sh
启动的本地 Flink 集群。为此,使用 bin/flink run
命令,您需要使用 FlinkSubmitter
而不是 FlinkLocalCluster
。此外,您的 jar 需要以下依赖项:
<include>org.apache.storm:storm-kafka</include>
<include>org.apache.kafka:kafka_2.10</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>com.google.guava:guava</include>
<include>com.yammer.metrics:metrics-core</include>
我正在将拓扑从 storm 移动到 flink。拓扑已缩减为 KafkaSpout->Bolt
。 bolt 只是计算数据包,而不是尝试解码它们。
编译后的jar通过flink -c <entry point> <path to .jar>
提交给flink,报如下错误:
java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:190)
at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
... 1 more
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1365)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:175)
... 3 more
我的问题:
- 我是否错过了 w/re KafkaSpout 的配置步骤?这在香草风暴中使用时有效。
- 是否有我需要使用的特定版本的风暴库?我在我的构建中包含了 0.9.4。
- 我可能还漏掉了什么?
我应该使用 storm KafkaSpout 还是使用 flink KafkaSource 自己编写更好?
编辑:
相关代码如下:
拓扑结构:
BrokerHosts brokerHosts = new ZkHosts(configuration.getString("kafka/zookeeper"));
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, configuration.getString("kafka/topic"), "/storm_env_values", "storm_env_DEBUG");
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("environment", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("decode_bytes", new EnvironmentBolt(), 1).shuffleGrouping("environment");
初始化:
FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("env_topology", conf, buildTopology());
螺栓基于BaseRichBolt。 execute() fn 只是记录任何要调试的数据包的存在。那里没有其他代码。
我刚刚看过这个。现在有一个问题,但我让它在本地工作。您可以将此热修复应用到您的代码并自行构建兼容层。
KafkaSpout
注册指标。但是,兼容层目前不支持指标。您需要删除FlinkTopologyContext.registerMetric(...)
和 returnnull
中的异常。 (已经有一个 open PR 在做 metrics 的整合,所以我不想把这个 hot fix 推到 master 分支)- 此外,您需要在查询中手动添加一些配置参数:
我只是在这里编造了一些值:
Config c = new Config();
List<String> zkServers = new ArrayList<String>();
zkServers.add("localhost");
c.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
c.put(Config.STORM_ZOOKEEPER_PORT, 2181);
c.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
c.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
- 您需要向您的项目添加一些额外的依赖项:
除了 flink-storm
你还需要:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
这对我有用,使用 Kafka_2.10-0.8.1.1 和 FlinkLocalCluster
在 Eclipse 中执行。
它也适用于通过 bin/start-local-streaming.sh
启动的本地 Flink 集群。为此,使用 bin/flink run
命令,您需要使用 FlinkSubmitter
而不是 FlinkLocalCluster
。此外,您的 jar 需要以下依赖项:
<include>org.apache.storm:storm-kafka</include>
<include>org.apache.kafka:kafka_2.10</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>com.google.guava:guava</include>
<include>com.yammer.metrics:metrics-core</include>