kafka consumer 0.9 向后兼容吗?

Is kafka consumer 0.9 backward compatible?

即将推出的kafka consumer 0.9.x是否会与0.8 broker兼容?

换句话说 - 可以只切换到新的消费者实施,而不触及任何其他东西吗?

基于这个 Consumer Client Re-design wiki 页面引用,

This would involve some significant changes to the consumer APIs*, so we would like to collect feedback on the proposal from our community. Since the list of changes is not small, we would like to understand if some features are preferred over others, and more importantly, if some features are not required at all.

*强调我的。

我没有找到任何明确说明不兼容的地方。但是使用该引用以及 0.8 中的生产者与 0.7 中的生产者不兼容这一事实,我假设它们不兼容。

没有。一般来说,建议在客户端之前升级代理,因为代理的目标是向后兼容。 0.9 代理将与 0.8 消费者和 0.9 消费者 API 一起工作,但反之则不然。

根据 Kafka 0.9.0 的文档,您不能使用新消费者从 0.8.x 代理读取数据。 原因如下:

0.9.0.0 has an inter-broker protocol change from previous versions.

看起来 kafka 0.9.0 内置了向后兼容性。检查 http://kafka.apache.org/documentation.html#upgrade

来自文档的引用

0.9.0.0 has potential breaking changes (please review before upgrading) and an inter-broker protocol change from previous versions. For a rolling upgrade:

  • Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X
  • Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
  • Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.9.0.0.
  • Restart the brokers one by one for the new protocol version to take effect

我最近遇到了类似的问题,在我的应用程序中,我必须从 kafka 0.9 读取数据然后写回 kafka 0.8。我按以下方式使用 kafka 客户端 0.9。

消费者配置

    props.put("bootstrap.servers", "brokers_ip as comma seperated values");
    props.put("group.id", "your group id");
    props.put("key.deserializer", StringDeserializer.class.getName());
    props.put("value.deserializer", StringDeserializer.class.getName());
    props.put("enable.auto.commit", "true");
    props.put("auto.commit.interval.ms", 1000);
    props.put("session.timeout.ms", 30000);
    consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe("List of topics to subscribe too");

生产者配置

        Properties props = new Properties();
        props.put("bootstrap.servers","list of broker ips");
        props.put("metadata.broker.list", "list of broker ips");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        ProducerConfig config = new ProducerConfig(props);
        Producer<String, String> producer = new Producer<String, String>(config);
        String message = "hello world";
        KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic_name, message);
        producer.send(data);
        producer.close();

希望对您有所帮助。