Kafka Streaming 不适用于多个实例

Kafka Streaming not working with multiple instances

当我 运行 我的 Kafka Streams 应用程序的多个实例时,只有第一个实例正确接收消息。但是,如果我启动新实例,它们将收不到任何消息。

有什么解决这个问题的建议吗?

这是我的 Kafka 流应用程序

package test.kafkastream;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TopologyBuilder;

public class Main {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "streams-wordcount-processor");

        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571");
        //props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        //props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class);


        // setting offset reset to earliest so that we can re-run the demo code
        // with the same pre-loaded data
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        TopologyBuilder builder = new TopologyBuilder();

        builder.addSource("Source", "topic6");

        builder.addProcessor("Process", new ProcessMessage(), "Source");

        KafkaStreams streams = new KafkaStreams(builder, props);
        streams.start();
    }

}

这是我的制作人

package test.kafkamesos;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;

public class Producer {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        Map<String, Object> producerConfig = new HashMap<String, Object>();
        producerConfig.put("bootstrap.servers", "192.168.2.38:45983,192.168.2.112:45635,192.168.2.116:39571");
        //producerConfig.put("bootstrap.servers", "localhost:9092");

        // optional:
        producerConfig.put("metadata.fetch.timeout.ms", "3000");
        producerConfig.put("request.timeout.ms", "3000");
        // ... other options:
        // http://kafka.apache.org/documentation.html#producerconfigs
        ByteArraySerializer serializer = new ByteArraySerializer();
        KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[], byte[]>(producerConfig, serializer,
                serializer);

        int i = 0;
        while (true) {
            String message = "{data:success,g:" + i + "}";
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], byte[]>("topic6", message.getBytes());
            kafkaProducer.send(record).get();
            System.out.println("sending " + message);
            Thread.sleep(1000);
            i++;
        }
    }
}

和我的 Dockerfile

FROM openjdk:8-jre
COPY ./target/*-with-dependencies.jar /jars/service-jar.jar
CMD java -cp /jars/service-jar.jar test.kafkastream.Main

我认为您遇到此问题是因为 Kafka 代理仅针对您正在使用的主题配置了一个分区 (topic6)。来自 Confluent 博客:

For example, if your application reads from a single topic that has 10 partitions, then you can run up to 10 instances of your applications (note that you can run further instances but these will be idle). In summary, the number of topic partitions is the upper limit for the parallelism of your Streams API application and thus for the number of running instances of your application.

来源:https://www.confluent.io/blog/elastic-scaling-in-kafka-streams/