如果我不关闭 kafka 生产者会发生什么
What happens if I don't close the kafka producer
我正在处理 xml,我需要为每条记录发送一条消息,当我收到最后一条记录时,我关闭了 kafka 生产者,这里的问题是 kafka 生产者的发送方法是异步的,因此,有时当我关闭生产者时,它会出现 java.lang.IllegalStateException: Cannot send after the producer is closed.
我在某处读到可以让生产者保持打开状态。我的问题是:这意味着什么,或者是否有更好的解决方案。
---编辑---
<list>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
...
</list>
想象一下以下场景:
- 我们读取标签并创建 kafka 生产者
- 我们读取每个元素的属性,生成一个 json 对象并使用 send 方法将其发送到 kafka。
-当我们读取元素时,我们调用生产者中的关闭方法
问题是元素的数量可以达到 80k,因此,有时当我们调用 disconnect 方法时,它会继续以异步方式发送消息。所以我们需要先调用flush方法但是会影响性能
您应该在调用 Producer.close()
之前调用 Producer.flush()
。这是一个阻塞调用,将 return 不会在所有记录发送之前。
如果您不调用 close()
,根据 implementation/language,您最终可能会遇到 resource/memory 泄漏。
我正在处理 xml,我需要为每条记录发送一条消息,当我收到最后一条记录时,我关闭了 kafka 生产者,这里的问题是 kafka 生产者的发送方法是异步的,因此,有时当我关闭生产者时,它会出现 java.lang.IllegalStateException: Cannot send after the producer is closed.
我在某处读到可以让生产者保持打开状态。我的问题是:这意味着什么,或者是否有更好的解决方案。
---编辑---
<list>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
<element attr1="" att2="" attr3=""/>
...
</list>
想象一下以下场景:
- 我们读取标签并创建 kafka 生产者
- 我们读取每个元素的属性,生成一个 json 对象并使用 send 方法将其发送到 kafka。 -当我们读取元素时,我们调用生产者中的关闭方法
问题是元素的数量可以达到 80k,因此,有时当我们调用 disconnect 方法时,它会继续以异步方式发送消息。所以我们需要先调用flush方法但是会影响性能
您应该在调用 Producer.close()
之前调用 Producer.flush()
。这是一个阻塞调用,将 return 不会在所有记录发送之前。
如果您不调用 close()
,根据 implementation/language,您最终可能会遇到 resource/memory 泄漏。