为什么卡夫卡消费者不听第一条消息?
Why the kafka consumer is not listening to the first message?
在我的测试程序中,我启动了监听器。然后循环发送消息。
如果我发送一条消息,它不会列出该消息。如果我发送 2 条消息,它会收听一条消息 message.If 我发送 3 条消息,它会收听 2 条消息..这是为什么?
制作人
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);
if (log.isDebugEnabled()) {
log.debug("producing messages to topic : " + topic + "file : " + payload.get("name"));
}
for (int i = 0; i < 3; i++) {
producer.send(message);
System.out.println("producing ..");
}
消费者
public void run() {
try {
ConsumerIterator<byte[], byte[]> itr = m_stream.iterator();
log.info("Kafka listener is ready to listen..");
System.out.println("listens....");
while (itr.hasNext()) {
byte[] data = itr.next().message();
System.out.println("Message received : " + data);
}
}
}
消费者属性
enable.auto.commit=true
auto.commit.interval.ms=101
session.timeout.ms=7000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.connect=zk1.xx\:2181
heartbeat.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.DefaultEncoder
bootstrap.servers=kk1.xx\:9092
group.id=test
consumer.timeout.ms=-1
fetch.min.bytes=1
receive.buffer.bytes=262144
我通过在我的制作人中设置 属性 来解决这个问题。
request.required.acks=1
在我的测试程序中,我启动了监听器。然后循环发送消息。 如果我发送一条消息,它不会列出该消息。如果我发送 2 条消息,它会收听一条消息 message.If 我发送 3 条消息,它会收听 2 条消息..这是为什么?
制作人
KeyedMessage<String, byte[]> message = new KeyedMessage<String, byte[]>(topic, serializedBytes);
if (log.isDebugEnabled()) {
log.debug("producing messages to topic : " + topic + "file : " + payload.get("name"));
}
for (int i = 0; i < 3; i++) {
producer.send(message);
System.out.println("producing ..");
}
消费者
public void run() {
try {
ConsumerIterator<byte[], byte[]> itr = m_stream.iterator();
log.info("Kafka listener is ready to listen..");
System.out.println("listens....");
while (itr.hasNext()) {
byte[] data = itr.next().message();
System.out.println("Message received : " + data);
}
}
}
消费者属性
enable.auto.commit=true
auto.commit.interval.ms=101
session.timeout.ms=7000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
zookeeper.connect=zk1.xx\:2181
heartbeat.interval.ms=1000
auto.offset.reset=smallest
serializer.class=kafka.serializer.DefaultEncoder
bootstrap.servers=kk1.xx\:9092
group.id=test
consumer.timeout.ms=-1
fetch.min.bytes=1
receive.buffer.bytes=262144
我通过在我的制作人中设置 属性 来解决这个问题。
request.required.acks=1