Spring 集成和 Kafka 消费者:在成功获取记录后立即停止消息驱动的通道适配器
Spring Integration & Kafka Consumer: Stop message-driven-channel-adapter right after records are sucessfully fetched
我正在使用以下配置:
- spring-集成-kafka 2.1.0.RELEASE
- kafka-客户端 0.10.0.1
- 卡夫卡 0.10.x.x
- spring-xd-1.3.1.RELEASE
我为 SpringXD 创建了自定义 Kafka 源模块。我设置了我的消费者逻辑和我的 message-driven-channel-adapter
(我将其与 control-bus
结合使用以停止我的通道适配器)。到目前为止,一切都很好。我还使用 kafka 属性 max.poll.record=10
每次轮询获取 10 条记录。
我想确保在成功获取所有记录(在本例中为 10 条记录)后立即停止我的频道。
所以例如:我想避免当不是所有记录都被成功获取和处理时停止读取(也就是说,当记录没有被发送到输出频道)。
有什么方法可以说明吗?
这是我的 xml 配置,以防万一:
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<int:channel id="input_to_control_bus" />
<int:channel id="output" />
<context:component-scan base-package="com.kafka.source.logic" />
<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />
<int-kafka:message-driven-channel-adapter
id="kafkaInboundChannelAdapterTesting" listener-container="container1"
auto-startup="false" phase="100" send-timeout="5000" channel="output"
mode="record" message-converter="messageConverter" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<!--Consumer -->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="max.poll.records" value="3" />
<entry key="group.id" value="bridge-stream-testing" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="testing-topic" />
</bean>
</constructor-arg>
</bean>
[更新一号]
我为什么要这样做?
这些是详细信息:
- 我想每 Y 分钟阅读来自 Kafka 主题的最多 X 条消息。
- 我使用
max.poll.records
来确保我每次轮询最多获取 X 条消息。
- 我想处理的一个场景是:如果在一次特定的消息轮询中,我轮询的消息少于 X,会发生什么情况。这意味着我应该在不等待 X 消息的情况下停止频道,否则我将不得不等到未来的消息轮询才能到达那些 X消息。
这些是关于这个场景的一些细节。还有更多场景,但我不想使用相同的 SO 问题来混合它。
[更新 N°2]
Artem 回答后的一些想法。
- 如果我不定义
max.poll.records
并等待达到 Y 分钟并计算 X[=74= 会发生什么] 消息,然后 stop
频道?
- 是不是有的消息会因为无法阅读而丢失,或者那些无法阅读的消息在我
start
重新打开频道时会被读取?
我想避免将消息保留在内存中,这就是我使用 message-driven-channel-adapter
+ max.poll.records
的原因
我可以建议的是 AtomicInteger
bean,它在每个处理的记录上增加,当您达到阈值时,您为 kafkaInboundChannelAdapterTesting
执行 stop()
。
我正在使用以下配置:
- spring-集成-kafka 2.1.0.RELEASE
- kafka-客户端 0.10.0.1
- 卡夫卡 0.10.x.x
- spring-xd-1.3.1.RELEASE
我为 SpringXD 创建了自定义 Kafka 源模块。我设置了我的消费者逻辑和我的 message-driven-channel-adapter
(我将其与 control-bus
结合使用以停止我的通道适配器)。到目前为止,一切都很好。我还使用 kafka 属性 max.poll.record=10
每次轮询获取 10 条记录。
我想确保在成功获取所有记录(在本例中为 10 条记录)后立即停止我的频道。
所以例如:我想避免当不是所有记录都被成功获取和处理时停止读取(也就是说,当记录没有被发送到输出频道)。
有什么方法可以说明吗?
这是我的 xml 配置,以防万一:
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<int:channel id="input_to_control_bus" />
<int:channel id="output" />
<context:component-scan base-package="com.kafka.source.logic" />
<int:control-bus id="my_control_bus" input-channel="input_to_control_bus" />
<int-kafka:message-driven-channel-adapter
id="kafkaInboundChannelAdapterTesting" listener-container="container1"
auto-startup="false" phase="100" send-timeout="5000" channel="output"
mode="record" message-converter="messageConverter" />
<bean id="messageConverter"
class="org.springframework.kafka.support.converter.MessagingMessageConverter" />
<!--Consumer -->
<bean id="container1"
class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
<entry key="enable.auto.commit" value="false" />
<entry key="auto.commit.interval.ms" value="100" />
<entry key="session.timeout.ms" value="15000" />
<entry key="max.poll.records" value="3" />
<entry key="group.id" value="bridge-stream-testing" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg name="topics" value="testing-topic" />
</bean>
</constructor-arg>
</bean>
[更新一号] 我为什么要这样做? 这些是详细信息:
- 我想每 Y 分钟阅读来自 Kafka 主题的最多 X 条消息。
- 我使用
max.poll.records
来确保我每次轮询最多获取 X 条消息。 - 我想处理的一个场景是:如果在一次特定的消息轮询中,我轮询的消息少于 X,会发生什么情况。这意味着我应该在不等待 X 消息的情况下停止频道,否则我将不得不等到未来的消息轮询才能到达那些 X消息。
这些是关于这个场景的一些细节。还有更多场景,但我不想使用相同的 SO 问题来混合它。
[更新 N°2]
Artem 回答后的一些想法。
- 如果我不定义
max.poll.records
并等待达到 Y 分钟并计算 X[=74= 会发生什么] 消息,然后stop
频道? - 是不是有的消息会因为无法阅读而丢失,或者那些无法阅读的消息在我
start
重新打开频道时会被读取?
我想避免将消息保留在内存中,这就是我使用 message-driven-channel-adapter
+ max.poll.records
我可以建议的是 AtomicInteger
bean,它在每个处理的记录上增加,当您达到阈值时,您为 kafkaInboundChannelAdapterTesting
执行 stop()
。