kafka -> storm -> flink :意外的块数据

kafka -> storm -> flink : unexpected block data

我正在将拓扑从 storm 移动到 flink。拓扑已缩减为 KafkaSpout->Bolt。 bolt 只是计算数据包,而不是尝试解码它们。

编译后的jar通过flink -c <entry point> <path to .jar>提交给flink,报如下错误:

java.lang.Exception: Call to registerInputOutput() of invokable failed
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:190)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
        ... 1 more
Caused by: java.io.StreamCorruptedException: unexpected block data
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1365)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:175)
        ... 3 more

我的问题:

  1. 我是否错过了 w/re KafkaSpout 的配置步骤?这在香草风暴中使用时有效。
  2. 是否有我需要使用的特定版本的风暴库?我在我的构建中包含了 0.9.4。
  3. 我可能还漏掉了什么?

我应该使用 storm KafkaSpout 还是使用 flink KafkaSource 自己编写更好?


编辑:

相关代码如下:

拓扑结构:

BrokerHosts brokerHosts = new ZkHosts(configuration.getString("kafka/zookeeper"));

SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, configuration.getString("kafka/topic"), "/storm_env_values", "storm_env_DEBUG");
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("environment", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("decode_bytes", new EnvironmentBolt(), 1).shuffleGrouping("environment");

初始化:

FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("env_topology", conf, buildTopology());

螺栓基于BaseRichBoltexecute() fn 只是记录任何要调试的数据包的存在。那里没有其他代码。

我刚刚看过这个。现在有一个问题,但我让它在本地工作。您可以将此热修复应用到您的代码并自行构建兼容层。

  1. KafkaSpout 注册指标。但是,兼容层目前不支持指标。您需要删除 FlinkTopologyContext.registerMetric(...) 和 return null 中的异常。 (已经有一个 open PR 在做 metrics 的整合,所以我不想把这个 hot fix 推到 master 分支)
  2. 此外,您需要在查询中手动添加一些配置参数:

我只是在这里编造了一些值:

Config c = new Config();
List<String> zkServers = new ArrayList<String>();
zkServers.add("localhost");
c.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
c.put(Config.STORM_ZOOKEEPER_PORT, 2181);
c.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
c.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
  1. 您需要向您的项目添加一些额外的依赖项:

除了 flink-storm 你还需要:

<dependency>
    <groupId>org.apache.storm</groupId>
    <artifactId>storm-kafka</artifactId>
    <version>0.9.4</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.10</artifactId>
    <version>0.8.1.1</version>
</dependency>

这对我有用,使用 Kafka_2.10-0.8.1.1FlinkLocalCluster 在 Eclipse 中执行。

它也适用于通过 bin/start-local-streaming.sh 启动的本地 Flink 集群。为此,使用 bin/flink run 命令,您需要使用 FlinkSubmitter 而不是 FlinkLocalCluster。此外,您的 jar 需要以下依赖项:

<include>org.apache.storm:storm-kafka</include>
<include>org.apache.kafka:kafka_2.10</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>com.google.guava:guava</include>
<include>com.yammer.metrics:metrics-core</include>