KafkaProducer:`callback` 和返回的`Future` 之间的区别?
KafkaProducer: Difference between `callback` and returned `Future`?
KafkaProducer send method returns 一个 Future 并接受一个回调。
在发送完成后使用一种机制而不是另一种机制来执行动作有什么根本区别吗?
主要区别在于是否要阻塞等待确认的调用线程。
以下使用 Future.get() 方法将阻塞当前线程,直到发送完成后再执行某些操作。
producer.send(record).get()
// Do some action
当使用回调执行某些操作时,代码将在 I/O 线程中执行,因此它对于调用线程是非阻塞的。
producer.send(record,
new Callback() {
// Do some action
}
});
虽然the docs说它'generally'在生产者中执行:
Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.
异步方法
producer.send(record, new Callback(){
@Override
onComplete(RecordMetadata rm, Exception ex){...}
})
与同步
相比,提供更好的吞吐量
RecordMetadata rm = producer.send(record).get();
因为在第一种情况下您不需要等待确认。
同样在异步方式中不能保证顺序,而在同步方式中它是 - 消息仅在收到确认后发送。
另一个区别可能是在同步调用中,如果发生异常,您可以停止发送消息异常发生后立即,而在第二种情况下,一些消息会在您发现之前发送发现有问题并执行一些操作。
另请注意,在异步方法中,"in fligh" 的消息数量由 max.in.flight.requests.per.connection
参数控制。
除了同步和异步方法之外,您还可以使用 即发即弃 方法,这与同步方法几乎相同,但不处理返回的元数据 - 只需发送消息并希望它会到达代理(知道它很可能会发生,并且生产者会在出现可恢复错误的情况下重试),但有可能会丢失一些消息:
RecordMetadata rm = producer.send(record);
总结:
- 即发即弃 - 最快的一种,但有些消息可能会丢失;
- 同步 - 最慢,如果您不能丢失消息,请使用它;
- 异步 - 介于两者之间。
我的观察基于 The Kafka Producer documentation:
Future
允许您访问同步处理
Future
可能无法保证确认。我的理解是 Callback
会在确认后执行
Callback
使您可以访问 完全非阻塞 异步处理。
- 同一分区上回调的执行顺序也有保证
Callbacks for records being sent to the same partition are guaranteed
to execute in order.
我的另一个观点是 Future
return 对象和 Callback
'pattern' 代表了两种不同的编程风格,我认为这是根本的区别:
Future
代表Java的并发模型风格。
Callback
代表了Java的Lambda编程风格(因为Callback实际上满足了函数式接口的要求)
您可能最终会使用 Future
和 Callback
样式编写类似的行为,但在某些用例中,看起来一种样式可能比另一种更有优势。
查看您链接到它的文档,看起来 Future 和 Callback 之间的主要区别在于谁发起了 "request is finished, what now?" 问题。
假设我们有一位顾客 C
和一位面包师 B
。 C
要求 B
给他做一块美味的饼干。现在,面包师可以通过 2 种可能的方式 return 向顾客提供美味的饼干。
未来
面包师接受请求并告诉顾客:好的,当我完成后,我会把你的饼干放在柜台上。 (这个协议就是Future
。)
在这种情况下,客户负责检查柜台 (Future
) 以了解面包师是否完成了他的饼干。
阻塞
顾客站在柜台附近看着它,直到饼干放在那里 (Future.get()) 或面包师在那里道歉(错误:饼干面团用完)。
non-blocking
客户做一些其他工作,偶尔检查一下 cookie 是否在柜台上等他 (Future.isDone())。如果 cookie 准备就绪,客户将接受它 (Future.get())。
回调
在这种情况下,客户在订购他的饼干后,告诉面包师:当我的饼干准备好后,请把它给我这里的宠物机器狗,他会知道如何处理它(这个机器人是回调).
现在面包师在饼干准备好后将饼干交给狗,并告诉他 运行 还给它的主人。面包师可以继续为另一位顾客烘焙下一块饼干。
狗 运行 回到顾客身边,开始摇动它的人造尾巴,让顾客知道他的饼干已经准备好了。
请注意,客户并不知道什么时候会给他饼干,他也没有主动询问面包师是否准备好了。
这是两种场景的主要区别。谁负责发起 "your cookie is ready, what do you want to do with it?" 问题。在 Future 中,客户负责检查它何时准备就绪,可以通过主动等待或不时轮询。在回调的情况下,面包师将回调提供的功能。
我希望这个答案能让您更好地了解 Future 和 Calback 的实际含义。一旦你有了大致的想法,你就可以尝试找出每个特定的事情是在哪个线程上处理的。当一个线程被阻塞时,或者一切以什么顺序完成。编写一些简单的程序来打印如下语句:"main client thread: cookie recieved" 可能是一种有趣的试验方式。
send()是一种在Kafka Cluster上开始发布消息的方法。 send() 方法是一个异步调用,表示 send 方法在 Buffer 中累积消息并 return 立即返回。这可以与 linger.ms 一起使用以批量发布消息以获得更好的性能。我们可以使用 Future 上的同步调用 send 方法或使用回调异步调用 send 方法来处理异常和控制。
每种方法各有利弊,可以根据使用情况来决定。
异步发送(即发即弃):
我们如下调用发送方法来调用发布消息而不等待任何成功或错误响应。
producer.send(new ProducerRecord<String, String>("topic-name", "key", "value"));
此方案不会等待完成第一条消息开始发送其他消息以进行发布。如果出现异常,生产者会根据重试配置参数进行重试,但如果重试后消息仍然失败,Kafka 生产者永远不会知道这一点。在这种情况下,我们可能会处理一些消息,但如果消息丢失很少,则可以提供高吞吐量和高延迟。
同步发送
同步发送消息的一种简单方法是使用 get() 方法
RecordMetadata recMetadata = producer.send(new ProducerRecord<String, String>("topic-name", "key", "value")).get();
Producer.send returns RecordMetadata 的未来,当我们调用 .get() 方法时,它将得到 Kafka 的回复。我们可以在出现错误时捕获 Error,在成功时捕获 return RecordMetadata。 RecordMetadata 包含偏移量、分区、时间戳以记录信息。速度慢,但可靠性高,保证消息传递。
带回调的异步发送
我们还可以使用回调函数调用 send() 方法,一旦消息完成,该回调函数 return 就会做出响应。如果您喜欢以异步方式发送消息,这很好,意味着不等待完成作业,但同时处理错误或更新有关消息传递的状态。
producer.send(record, new Callback(){
@Override
onComplete(RecordMetadata recodMetadata, Exception ex){...}
})
注意:请不要将 ack & retries 与异步发送调用混淆。确认和重试将应用于每个发送调用,无论是同步调用还是异步调用,唯一重要的是您如何处理 return 消息和失败情况。例如,如果您发送异步发送仍然会应用 ack 和重试规则,但将在一个独立的线程上,而不会阻塞其他线程发送并行记录。唯一的挑战是在失败的情况下我们不会意识到它的消息成功完成的时间。
KafkaProducer send method returns 一个 Future 并接受一个回调。
在发送完成后使用一种机制而不是另一种机制来执行动作有什么根本区别吗?
主要区别在于是否要阻塞等待确认的调用线程。
以下使用 Future.get() 方法将阻塞当前线程,直到发送完成后再执行某些操作。
producer.send(record).get()
// Do some action
当使用回调执行某些操作时,代码将在 I/O 线程中执行,因此它对于调用线程是非阻塞的。
producer.send(record,
new Callback() {
// Do some action
}
});
虽然the docs说它'generally'在生产者中执行:
Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or they will delay the sending of messages from other threads. If you want to execute blocking or computationally expensive callbacks it is recommended to use your own Executor in the callback body to parallelize processing.
异步方法
producer.send(record, new Callback(){
@Override
onComplete(RecordMetadata rm, Exception ex){...}
})
与同步
相比,提供更好的吞吐量RecordMetadata rm = producer.send(record).get();
因为在第一种情况下您不需要等待确认。
同样在异步方式中不能保证顺序,而在同步方式中它是 - 消息仅在收到确认后发送。
另一个区别可能是在同步调用中,如果发生异常,您可以停止发送消息异常发生后立即,而在第二种情况下,一些消息会在您发现之前发送发现有问题并执行一些操作。
另请注意,在异步方法中,"in fligh" 的消息数量由 max.in.flight.requests.per.connection
参数控制。
除了同步和异步方法之外,您还可以使用 即发即弃 方法,这与同步方法几乎相同,但不处理返回的元数据 - 只需发送消息并希望它会到达代理(知道它很可能会发生,并且生产者会在出现可恢复错误的情况下重试),但有可能会丢失一些消息:
RecordMetadata rm = producer.send(record);
总结:
- 即发即弃 - 最快的一种,但有些消息可能会丢失;
- 同步 - 最慢,如果您不能丢失消息,请使用它;
- 异步 - 介于两者之间。
我的观察基于 The Kafka Producer documentation:
Future
允许您访问同步处理Future
可能无法保证确认。我的理解是Callback
会在确认后执行Callback
使您可以访问 完全非阻塞 异步处理。- 同一分区上回调的执行顺序也有保证
Callbacks for records being sent to the same partition are guaranteed to execute in order.
我的另一个观点是 Future
return 对象和 Callback
'pattern' 代表了两种不同的编程风格,我认为这是根本的区别:
Future
代表Java的并发模型风格。Callback
代表了Java的Lambda编程风格(因为Callback实际上满足了函数式接口的要求)
您可能最终会使用 Future
和 Callback
样式编写类似的行为,但在某些用例中,看起来一种样式可能比另一种更有优势。
查看您链接到它的文档,看起来 Future 和 Callback 之间的主要区别在于谁发起了 "request is finished, what now?" 问题。
假设我们有一位顾客 C
和一位面包师 B
。 C
要求 B
给他做一块美味的饼干。现在,面包师可以通过 2 种可能的方式 return 向顾客提供美味的饼干。
未来
面包师接受请求并告诉顾客:好的,当我完成后,我会把你的饼干放在柜台上。 (这个协议就是Future
。)
在这种情况下,客户负责检查柜台 (Future
) 以了解面包师是否完成了他的饼干。
阻塞 顾客站在柜台附近看着它,直到饼干放在那里 (Future.get()) 或面包师在那里道歉(错误:饼干面团用完)。
non-blocking 客户做一些其他工作,偶尔检查一下 cookie 是否在柜台上等他 (Future.isDone())。如果 cookie 准备就绪,客户将接受它 (Future.get())。
回调
在这种情况下,客户在订购他的饼干后,告诉面包师:当我的饼干准备好后,请把它给我这里的宠物机器狗,他会知道如何处理它(这个机器人是回调).
现在面包师在饼干准备好后将饼干交给狗,并告诉他 运行 还给它的主人。面包师可以继续为另一位顾客烘焙下一块饼干。
狗 运行 回到顾客身边,开始摇动它的人造尾巴,让顾客知道他的饼干已经准备好了。
请注意,客户并不知道什么时候会给他饼干,他也没有主动询问面包师是否准备好了。
这是两种场景的主要区别。谁负责发起 "your cookie is ready, what do you want to do with it?" 问题。在 Future 中,客户负责检查它何时准备就绪,可以通过主动等待或不时轮询。在回调的情况下,面包师将回调提供的功能。
我希望这个答案能让您更好地了解 Future 和 Calback 的实际含义。一旦你有了大致的想法,你就可以尝试找出每个特定的事情是在哪个线程上处理的。当一个线程被阻塞时,或者一切以什么顺序完成。编写一些简单的程序来打印如下语句:"main client thread: cookie recieved" 可能是一种有趣的试验方式。
send()是一种在Kafka Cluster上开始发布消息的方法。 send() 方法是一个异步调用,表示 send 方法在 Buffer 中累积消息并 return 立即返回。这可以与 linger.ms 一起使用以批量发布消息以获得更好的性能。我们可以使用 Future 上的同步调用 send 方法或使用回调异步调用 send 方法来处理异常和控制。
每种方法各有利弊,可以根据使用情况来决定。
异步发送(即发即弃): 我们如下调用发送方法来调用发布消息而不等待任何成功或错误响应。
producer.send(new ProducerRecord<String, String>("topic-name", "key", "value"));
此方案不会等待完成第一条消息开始发送其他消息以进行发布。如果出现异常,生产者会根据重试配置参数进行重试,但如果重试后消息仍然失败,Kafka 生产者永远不会知道这一点。在这种情况下,我们可能会处理一些消息,但如果消息丢失很少,则可以提供高吞吐量和高延迟。
同步发送 同步发送消息的一种简单方法是使用 get() 方法
RecordMetadata recMetadata = producer.send(new ProducerRecord<String, String>("topic-name", "key", "value")).get();
Producer.send returns RecordMetadata 的未来,当我们调用 .get() 方法时,它将得到 Kafka 的回复。我们可以在出现错误时捕获 Error,在成功时捕获 return RecordMetadata。 RecordMetadata 包含偏移量、分区、时间戳以记录信息。速度慢,但可靠性高,保证消息传递。
带回调的异步发送 我们还可以使用回调函数调用 send() 方法,一旦消息完成,该回调函数 return 就会做出响应。如果您喜欢以异步方式发送消息,这很好,意味着不等待完成作业,但同时处理错误或更新有关消息传递的状态。
producer.send(record, new Callback(){
@Override
onComplete(RecordMetadata recodMetadata, Exception ex){...}
})
注意:请不要将 ack & retries 与异步发送调用混淆。确认和重试将应用于每个发送调用,无论是同步调用还是异步调用,唯一重要的是您如何处理 return 消息和失败情况。例如,如果您发送异步发送仍然会应用 ack 和重试规则,但将在一个独立的线程上,而不会阻塞其他线程发送并行记录。唯一的挑战是在失败的情况下我们不会意识到它的消息成功完成的时间。