将 Lenses MQTT 源连接器与融合的 kafka 结合使用时,消息流间歇性
Message flow intermittent when using Lenses MQTT Source connector with confluent kafka
我正在尝试将 Lenses MQTT 源连接器 [https://docs.lenses.io/connectors/source/mqtt.html] 与 confluent kafka v5.4 结合使用。
以下是我的 MQTT 源连接器属性文件:
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.clean=false
key.converter.schemas.enable=false
connect.mqtt.timeout=1000
value.converter.schemas.enable=false
name=kmd-source-4
connect.mqtt.kcql=INSERT INTO kafka-source-topic-2 SELECT * FROM ctt/+/+/location WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.service.quality=1
key.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.hosts=tcp://ip:1883
connect.mqtt.converter.throw.on.error=true
connect.mqtt.username=username
connect.mqtt.password=password
errors.log.include.messages=true
errors.log.enable=true
我正在从基于 UI 的 MQTT 客户端 MQTT fx 发布消息到 MQTT 主题 'ctt/+/+/location' 并在 kafka 主题 'kafka-source-topic-2' 上订阅这些消息。我正在使用 Rabbit MQ 作为我的 MQTT经纪人和我的融合平台和 RabbitMQ 在不同的虚拟机上。我认为使用 RabbitMQ 代理而不是 Mosquitto MQTT 应该不是问题。无论何时何地我从 MQTT fx 发布,我都能在订阅后成功地看到 MQTT fx 中的消息。我还设置了 confleunt MongoDB 源连接器,它可以无缝运行。
但我的问题是 - 在 MQTT 主题上发布的消息间歇性地在映射的 kafka 主题上可用。可能是什么原因?我在 kafka 连接日志中没有看到任何错误消息。我需要在我的 MQTT 源属性文件中指定与 MQTT 代理相关的任何连接相关属性吗? Rabbit MQ 代理是否包含任何属性?有没有人使用过 Lenses MQTT 源和接收器连接器并且想就它们提出任何建议?
你的connect.mqtt.timeout只有1秒?!?间歇性消息向我表明您的连接器超时并且必须重新建立连接,而当它忙于这样做时,MQTT 消息正在进入但没有进入连接器,因为它当时没有订阅代理.尝试将超时时间增加到 60000(1 分钟)之类的值,看看会发生什么。有什么理由需要它超时吗? RabbitMQ 可以处理在没有流量的情况下长时间保持打开状态的连接。
我正在尝试将 Lenses MQTT 源连接器 [https://docs.lenses.io/connectors/source/mqtt.html] 与 confluent kafka v5.4 结合使用。
以下是我的 MQTT 源连接器属性文件:
connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.clean=false
key.converter.schemas.enable=false
connect.mqtt.timeout=1000
value.converter.schemas.enable=false
name=kmd-source-4
connect.mqtt.kcql=INSERT INTO kafka-source-topic-2 SELECT * FROM ctt/+/+/location WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonSimpleConverter` WITHKEY(id)
value.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.service.quality=1
key.converter=org.apache.kafka.connect.json.JsonConverter
connect.mqtt.hosts=tcp://ip:1883
connect.mqtt.converter.throw.on.error=true
connect.mqtt.username=username
connect.mqtt.password=password
errors.log.include.messages=true
errors.log.enable=true
我正在从基于 UI 的 MQTT 客户端 MQTT fx 发布消息到 MQTT 主题 'ctt/+/+/location' 并在 kafka 主题 'kafka-source-topic-2' 上订阅这些消息。我正在使用 Rabbit MQ 作为我的 MQTT经纪人和我的融合平台和 RabbitMQ 在不同的虚拟机上。我认为使用 RabbitMQ 代理而不是 Mosquitto MQTT 应该不是问题。无论何时何地我从 MQTT fx 发布,我都能在订阅后成功地看到 MQTT fx 中的消息。我还设置了 confleunt MongoDB 源连接器,它可以无缝运行。
但我的问题是 - 在 MQTT 主题上发布的消息间歇性地在映射的 kafka 主题上可用。可能是什么原因?我在 kafka 连接日志中没有看到任何错误消息。我需要在我的 MQTT 源属性文件中指定与 MQTT 代理相关的任何连接相关属性吗? Rabbit MQ 代理是否包含任何属性?有没有人使用过 Lenses MQTT 源和接收器连接器并且想就它们提出任何建议?
你的connect.mqtt.timeout只有1秒?!?间歇性消息向我表明您的连接器超时并且必须重新建立连接,而当它忙于这样做时,MQTT 消息正在进入但没有进入连接器,因为它当时没有订阅代理.尝试将超时时间增加到 60000(1 分钟)之类的值,看看会发生什么。有什么理由需要它超时吗? RabbitMQ 可以处理在没有流量的情况下长时间保持打开状态的连接。