Storm Kafka Spout 无法读取上次读取

Storm Kafka Spout Unable to read last off read

我正在使用 storm-kafka-0.9.3 从 Kafka 读取数据并在 Storm 中处理这些数据。下面是 Kafka Spout 我是 using.But 问题是当我杀死 Storm 集群时,它不读取它死时发送的旧数据,它从最新的偏移量开始读取。

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST);

SpoutConfig spoutConfig = new SpoutConfig(hosts, CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME
        , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Never should make this true
spoutConfig.forceFromStart=false;
spoutConfig.startOffsetTime =-2;

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
return kafkaSpout;

我相信这可能会发生,因为虽然拓扑是 运行,但它曾经使用以下路径 SpoutConfig.zkRoot+ "/" + SpoutConfig.id 将所有状态信息保存到 zookeeper,以便在发生故障时它可以从zookeeper 中最后写入的偏移量。

从文档中得到这个

Important:When re-deploying a topology make sure that the settings for SpoutConfig.zkRoot and SpoutConfig.id were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.

在您的情况下,SpoutConfig.id 是一个随机值 UUID.randomUUID().toString() 它无法检索最后提交的偏移量。

也阅读同一页

when a topology has run once the setting KafkaConfig.startOffsetTime will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime as described above

您可以使用静态 id 来查看它是否能够检索。

您需要设置 spoutConfig.zkServers 和 spoutConfig.zkPort :

BrokerHosts hosts = new ZkHosts(Constants.ZOOKEEPER_HOST);
SpoutConfig spoutConfig = new SpoutConfig(hosts,  CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME
    , "/" + CommonConstants.KAFKA_TRANSACTION_TOPIC_NAME,"test");

spoutConfig.zkPort=Constants.ZOOKEEPER_PORT;  
spoutConfig.zkServers=Constants.ZOOKEEPER_SERVERS;

spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
return kafkaSpout;

谢谢大家, 由于我是运行 Local模式的Topology,Storm没有在ZK中存储Offset,当我运行 Prod模式的topology时就解决了。

草形