了解 Java 的 Kafka 消费者 API

Understanding Kafka Consumer API for Java

想了解Kafka ReceivingAPI。我已经包含了一个有效的示例代码。

  1. 为什么单个主题的 Kafka consumerStreamMap.get(topic) 有一个 KafkaStream<> 接收者列表?
  2. 当前流程好像是遍历KafkaStream<>List,然后遍历消息。但是 KafkaReceiver 应该永远 运行 ,所以我希望内部 while 永远循环。这使得 List> 变得多余。
  3. 一些例子也使用了consumerStreamMap.get(topic).get(0)。那么这是写制作人的正确方式吗?

        Map<String, Integer> topicMap = new HashMap<String, Integer>();
        // Define single thread for topic
        topicMap.put(topicName, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
        List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);
    
        for (final KafkaStream<byte[], byte[]> stream : streamList) 
        {
           ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
           while (consumerIte.hasNext()) 
           {
              counter++;
              String message = new String(consumerIte.next().message());
              String id = topic.hashCode() + "-" + date.getTime() + "-" + counter;
              System.out.println(message);
            }
          }
    

你可以在kafka wiki中找到答案: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

  1. consumerStreamMap 是(主题,KafkaStream 列表)对的映射。流的数量取决于代码中的以下行:

    topicMap.put(topicName, numberOfStreams);
    

if you provide more threads than there are partitions on the topic, some threads will never see a message. if you have more partitions than you have threads, some threads will receive data from multiple partitions. if you have multiple partitions per thread there is NO guarantee about the order you receive messages, other than that within the partition the offsets will be sequential. For example, you may receive 5 messages from partition 10 and 6 from partition 11, then 5 more from partition 10 followed by 5 more from partition 10 even if partition 11 has data available. adding more processes/threads will cause Kafka to re-balance, possibly changing the assignment of a Partition to a Thread.

  1. 您需要在自己的线程中迭代每个流。

    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
    
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
    
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
    
    public class ConsumerTest implements Runnable {
        private KafkaStream m_stream;
        private int m_threadNumber;
    
        public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
            m_threadNumber = a_threadNumber;
            m_stream = a_stream;
        }
    
        public void run() {
            ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
            while (it.hasNext())
                System.out.println("Thread " + m_threadNumber + ": " + new      String(it.next().message()));
            System.out.println("Shutting down Thread: " + m_threadNumber);
        }
    }
    
  2. consumerStreamMap.get(topic).get(0) 仅当您有 1 个主题和 1 个流时才是正确的