为什么这个 Kafka 消费者不关闭?

Why isn't this Kafka consumer shutting down?

我希望消费测试只读取一条消息并关闭。然而,它不是,即使在我调用 consumer.shutdown() 之后也是如此。想知道为什么?

测试

public class AppTest
{
    App app=new App();

    @org.junit.Test
    public void publish()
    {

        assertTrue(app.publish("temptopic","message"));
    }

    @org.junit.Test
    public void consume()
    {
        app.publish("temptopic","message");
        assertEquals("message",app.consume("temptopic","tempgroup"));
    }
}

Class 测试中

public class App 
{
    public boolean publish(String topicName, String message) {

        Properties p=new Properties();
        p.put("bootstrap.servers","localhost:9092");
        p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        p.put("request.required.acks","1");

        KafkaProducer producer = new KafkaProducer<String,String>(p);

        ProducerRecord record=new ProducerRecord(topicName,"mykey",message);

        Future<RecordMetadata> future = producer.send(record);
        RecordMetadata recordMetadata=null;
        try {
            recordMetadata = future.get();
        } catch (Exception e) {
            return false;
        }

        return true;
    }

    public String consume(String topicName, String groupId)
    {
        Properties p=new Properties();
        p.put("zookeeper.connect","localhost:2181");
        p.put("group.id","groupId");
        p.put("zookeeper.session.timeout.ms","500");
        p.put("zookeeper.sync.time.ms","250");
        p.put("auto.commit.interval.ms", "1000");

        ConsumerConnector consumer=null;
        String result = "";

        try
        {
            consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p));

            Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>();
            topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read

            Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount);

            List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);


            for (KafkaStream stream : streams) {
                ConsumerIterator it = stream.iterator();
                while (it.hasNext()) {
                    MessageAndMetadata msg = it.next();
                    String strMsg = new String((byte[]) msg.message());
                    System.out.println("received: " + strMsg);
                    result += strMsg;
                }
            }
        }
        finally
        {
            if (consumer != null)
            {
                consumer.shutdown();
            }
        }
        return result;
    }
}

默认情况下,ConsumerIterator 将无限期阻塞并等待下一条消息。要使其脱离 while 循环,您需要中断线程。

有一个consumer 属性 "consumer.timeout.ms"可以设置一个值,超时后会抛出异常给consumer,但这可能不是单元测试最实用的解决方案。