如果有一条消息大于获取大小,kafka 获取请求将无法获取任何消息
kafka fetch request cannot fetch any messages if there is a single message that is bigger than fetch size
在kafka手册网站中,有这个代码块可以从kafka获取消息。但如果有一条消息大于获取大小,它就不起作用。
但是,我们事先并不知道进入队列的最大大小是多少。有没有办法让它总是至少获取一条消息?
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageSet;
import kafka.utils.Utils;
...
// create a consumer to connect to the kafka server running on localhost, port 9092, socket timeout of 10 secs, socket receive buffer of ~1MB
SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
long offset = 0;
while (true) {
// create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000);
// get the message set from the consumer and print them out
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for(MessageAndOffset msg : messages) {
System.out.println("consumed: " + Utils.toString(msg.message.payload(), "UTF-8"));
// advance the offset after consuming each message
offset = msg.offset;
}
}
这里有两种方法:将您的服务器配置为拒绝大于您的消费者可以获取的最大消息大小的消息。确保您的经纪人的 max.message.bytes 设置和您的消费者的 fetch.message.max.bytes 在生产/消费主题之前相同(参见 https://kafka.apache.org/08/configuration.html)。这会将问题推给生产者以弄清楚如何防止消息大小过大。
另一种方法是增加消费者的 fetch-max-bytes 并重试。因此,从 consumer.fetch(fetchRequest) 捕获 MessageSizeTooLarge 错误并使用更大的最大消息大小重试此操作(最后一个参数,示例代码中的 1000000):
FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000)
但这是危险的——这是最大消息大小配置的要点。
我建议将消息大小限制推送给您的生产者。
在kafka手册网站中,有这个代码块可以从kafka获取消息。但如果有一条消息大于获取大小,它就不起作用。
但是,我们事先并不知道进入队列的最大大小是多少。有没有办法让它总是至少获取一条消息?
import kafka.api.FetchRequest;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.message.MessageSet;
import kafka.utils.Utils;
...
// create a consumer to connect to the kafka server running on localhost, port 9092, socket timeout of 10 secs, socket receive buffer of ~1MB
SimpleConsumer consumer = new SimpleConsumer("127.0.0.1", 9092, 10000, 1024000);
long offset = 0;
while (true) {
// create a fetch request for topic “test”, partition 0, current offset, and fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000);
// get the message set from the consumer and print them out
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for(MessageAndOffset msg : messages) {
System.out.println("consumed: " + Utils.toString(msg.message.payload(), "UTF-8"));
// advance the offset after consuming each message
offset = msg.offset;
}
}
这里有两种方法:将您的服务器配置为拒绝大于您的消费者可以获取的最大消息大小的消息。确保您的经纪人的 max.message.bytes 设置和您的消费者的 fetch.message.max.bytes 在生产/消费主题之前相同(参见 https://kafka.apache.org/08/configuration.html)。这会将问题推给生产者以弄清楚如何防止消息大小过大。
另一种方法是增加消费者的 fetch-max-bytes 并重试。因此,从 consumer.fetch(fetchRequest) 捕获 MessageSizeTooLarge 错误并使用更大的最大消息大小重试此操作(最后一个参数,示例代码中的 1000000):
FetchRequest fetchRequest = new FetchRequest("test", 0, offset, 1000000)
但这是危险的——这是最大消息大小配置的要点。
我建议将消息大小限制推送给您的生产者。