Apache kafka 高级消费者 - 理解
Apache kafka HighLevel Consumer - Understanding
我一直在寻找 Apache kafka 以实现 HighLevel 消费者(我不想玩消息,我只需要将数据放入 MongoDB)v0.8.1.1
我查看了以下链接,其中显示了有关如何实施消费者的非常详细的信息。
Apache Kafka consumer wiki
Another kafka consumer
但是我仍然对所有线程关闭后消费者如何重新启动一无所知。例如。假设我有 4 个消费者线程 运行,它们消耗了来自 kafka broker 的所有消息,所以一旦没有消息,所有消费者将什么都不做,在特定超时后它将关闭,所以我不确定如何当 kafka broker 中有新消息时,消费者将再次重新启动。
有人可以分享一些代码或至少一些关于此的指示。还有一种方法可以让我们在一些回调方法中拥有我们的业务逻辑,当有消息而不是 while 循环时将被调用。
我认为您可能误解了关机期间超时的使用。从理论上讲,无论这些事件之间的时间间隔如何,您都在消耗无限的事件流,因此除非您正在更新代码或机器崩溃,否则您的消费者应该 永远不会 关闭。在实际需要关闭消费者的情况下,10000 毫秒超时的作用是为 Kafka 消费者提供足够的时间将其最后读取的偏移量写入 ZooKeeper,以便当消费者重新启动时它将从它处理的最后一个偏移量恢复。这种消费者关闭通常发生在您的程序关闭时(可能捕获到 InterruptedException),而不仅仅是消费者。因此,当您的程序重新启动时,consuner 将重新启动。
编辑
我应该补充一点,Kafka 的 ConsumerIterator 遵循这种永无止境的消费模型的原因。迭代器的 next 方法将 always 阻塞,直到它可以读取下一条消息。因此,示例中达到超时的唯一方法是消费者线程因某些异常而关闭。
编辑 2
我还没有看到任何支持回调的 Kafka 消费者 API 我认为您现在唯一的选择是编写自己的回调实现,例如:
public interface Callback {
void call(MessageAndMetadata message);
}
Executor executor = Executors.newCachedThreadPool();
final Callback<byte[], byte[]> callback = new MyCallback();
while (it.hasNext()) {
final MessageAndMetadata message = it.next();
executor.submit(new Runnable() {
public void run() {
callback.call(message);
}
});
}
您可能会对它们 currently rewriting the consumer API for Kafka 0.9 感兴趣,但我认为我没有在重写中看到回调(尽管我可能是错的)。
我一直在寻找 Apache kafka 以实现 HighLevel 消费者(我不想玩消息,我只需要将数据放入 MongoDB)v0.8.1.1
我查看了以下链接,其中显示了有关如何实施消费者的非常详细的信息。
Apache Kafka consumer wiki Another kafka consumer
但是我仍然对所有线程关闭后消费者如何重新启动一无所知。例如。假设我有 4 个消费者线程 运行,它们消耗了来自 kafka broker 的所有消息,所以一旦没有消息,所有消费者将什么都不做,在特定超时后它将关闭,所以我不确定如何当 kafka broker 中有新消息时,消费者将再次重新启动。
有人可以分享一些代码或至少一些关于此的指示。还有一种方法可以让我们在一些回调方法中拥有我们的业务逻辑,当有消息而不是 while 循环时将被调用。
我认为您可能误解了关机期间超时的使用。从理论上讲,无论这些事件之间的时间间隔如何,您都在消耗无限的事件流,因此除非您正在更新代码或机器崩溃,否则您的消费者应该 永远不会 关闭。在实际需要关闭消费者的情况下,10000 毫秒超时的作用是为 Kafka 消费者提供足够的时间将其最后读取的偏移量写入 ZooKeeper,以便当消费者重新启动时它将从它处理的最后一个偏移量恢复。这种消费者关闭通常发生在您的程序关闭时(可能捕获到 InterruptedException),而不仅仅是消费者。因此,当您的程序重新启动时,consuner 将重新启动。
编辑
我应该补充一点,Kafka 的 ConsumerIterator 遵循这种永无止境的消费模型的原因。迭代器的 next 方法将 always 阻塞,直到它可以读取下一条消息。因此,示例中达到超时的唯一方法是消费者线程因某些异常而关闭。
编辑 2
我还没有看到任何支持回调的 Kafka 消费者 API 我认为您现在唯一的选择是编写自己的回调实现,例如:
public interface Callback {
void call(MessageAndMetadata message);
}
Executor executor = Executors.newCachedThreadPool();
final Callback<byte[], byte[]> callback = new MyCallback();
while (it.hasNext()) {
final MessageAndMetadata message = it.next();
executor.submit(new Runnable() {
public void run() {
callback.call(message);
}
});
}
您可能会对它们 currently rewriting the consumer API for Kafka 0.9 感兴趣,但我认为我没有在重写中看到回调(尽管我可能是错的)。