为什么这个 Kafka 消费者不关闭?
Why isn't this Kafka consumer shutting down?
我希望消费测试只读取一条消息并关闭。然而,它不是,即使在我调用 consumer.shutdown()
之后也是如此。想知道为什么?
测试
public class AppTest
{
App app=new App();
@org.junit.Test
public void publish()
{
assertTrue(app.publish("temptopic","message"));
}
@org.junit.Test
public void consume()
{
app.publish("temptopic","message");
assertEquals("message",app.consume("temptopic","tempgroup"));
}
}
Class 测试中
public class App
{
public boolean publish(String topicName, String message) {
Properties p=new Properties();
p.put("bootstrap.servers","localhost:9092");
p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("request.required.acks","1");
KafkaProducer producer = new KafkaProducer<String,String>(p);
ProducerRecord record=new ProducerRecord(topicName,"mykey",message);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata=null;
try {
recordMetadata = future.get();
} catch (Exception e) {
return false;
}
return true;
}
public String consume(String topicName, String groupId)
{
Properties p=new Properties();
p.put("zookeeper.connect","localhost:2181");
p.put("group.id","groupId");
p.put("zookeeper.session.timeout.ms","500");
p.put("zookeeper.sync.time.ms","250");
p.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer=null;
String result = "";
try
{
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p));
Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>();
topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount);
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata msg = it.next();
String strMsg = new String((byte[]) msg.message());
System.out.println("received: " + strMsg);
result += strMsg;
}
}
}
finally
{
if (consumer != null)
{
consumer.shutdown();
}
}
return result;
}
}
默认情况下,ConsumerIterator 将无限期阻塞并等待下一条消息。要使其脱离 while 循环,您需要中断线程。
有一个consumer 属性 "consumer.timeout.ms"可以设置一个值,超时后会抛出异常给consumer,但这可能不是单元测试最实用的解决方案。
我希望消费测试只读取一条消息并关闭。然而,它不是,即使在我调用 consumer.shutdown()
之后也是如此。想知道为什么?
测试
public class AppTest
{
App app=new App();
@org.junit.Test
public void publish()
{
assertTrue(app.publish("temptopic","message"));
}
@org.junit.Test
public void consume()
{
app.publish("temptopic","message");
assertEquals("message",app.consume("temptopic","tempgroup"));
}
}
Class 测试中
public class App
{
public boolean publish(String topicName, String message) {
Properties p=new Properties();
p.put("bootstrap.servers","localhost:9092");
p.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
p.put("request.required.acks","1");
KafkaProducer producer = new KafkaProducer<String,String>(p);
ProducerRecord record=new ProducerRecord(topicName,"mykey",message);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata=null;
try {
recordMetadata = future.get();
} catch (Exception e) {
return false;
}
return true;
}
public String consume(String topicName, String groupId)
{
Properties p=new Properties();
p.put("zookeeper.connect","localhost:2181");
p.put("group.id","groupId");
p.put("zookeeper.session.timeout.ms","500");
p.put("zookeeper.sync.time.ms","250");
p.put("auto.commit.interval.ms", "1000");
ConsumerConnector consumer=null;
String result = "";
try
{
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(p));
Map<String, Integer> topicPartitionCount = new HashMap<String, Integer>();
topicPartitionCount.put(topicName, Integer.valueOf(1));//use 1 thread to read
Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicPartitionCount);
List<KafkaStream<byte[], byte[]>> streams = messageStreams.get(topicName);
for (KafkaStream stream : streams) {
ConsumerIterator it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata msg = it.next();
String strMsg = new String((byte[]) msg.message());
System.out.println("received: " + strMsg);
result += strMsg;
}
}
}
finally
{
if (consumer != null)
{
consumer.shutdown();
}
}
return result;
}
}
默认情况下,ConsumerIterator 将无限期阻塞并等待下一条消息。要使其脱离 while 循环,您需要中断线程。
有一个consumer 属性 "consumer.timeout.ms"可以设置一个值,超时后会抛出异常给consumer,但这可能不是单元测试最实用的解决方案。