如何将 springxd 源 mqtt 注册到 mqtt 代理

how to register springxd source mqtt to mqtt broker

我用 spring-Xd 创建了流,例如-

stream create mqtttestfile --definition "mqtt --url='tcp://localhost:1883' --topics='helloTopic' | file" --deploy

创建并部署了新流 'mqtttestfile'。我还检查了 localhost:9393/admin-ui,流已成功创建和部署。

我的 MQTT 代理在 localhost:1883 上 运行。 但是当我检查 /tmp/xd/output 文件目录时,mqtttestfile.out 文件丢失。

我需要澄清以下几点我的假设:-

  1. 我认为 MQTT 客户端已经在 spring-xd 源 mqtt 模块中配置。所以当我们创建流时,它会自动订阅代理上的特定主题。

  2. 我还尝试使用 运行 两个单独的 python 脚本,一个用于订阅,另一个用于单独终端上的发布者,并且工作正常。所以 mqtt 代理没问题。

这是我从 spring-xd 控制台得到的日志:

istener - 计划在 15000 毫秒内部署到新容器

2017-03-04T12:07:51+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 路径缓存事件:path=/deployments/modules/allocated/d634d310-12b4-4a83-baea-c1c98dfb7bba/mqtttestfile.sink.file .1, 类型=CHILD_ADDED

2017-03-04T12:07:51+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 为流 'mqtttestfile' 部署模块 'file' 2017-03-04T12:07:52+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 部署模块 [ModuleDescriptor@7e658391 moduleName = 'file', moduleLabel = 'file', group = 'mqtttestfile', sourceChannelName = [null], sinkChannelName = [null], index = 1, type = sink, parameters = map[[empty]], children = list[[empty]]]

2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 路径缓存事件:path=/deployments/modules/allocated/d634d310-12b4-4a83-baea-c1c98dfb7bba/mqtttestfile.source.mqtt .1, 类型=CHILD_ADDED

2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 为流 'mqtttestfile' 部署模块 'mqtt' 2017-03-04T12:07:54+0530 1.3.0.RELEASE INFO DeploymentsPathChildrenCache-0 container.DeploymentListener - 部署模块 [ModuleDescriptor@5a7d8e37 moduleName = 'mqtt', moduleLabel = 'mqtt', group = 'mqtttestfile', sourceChannelName = [null], sinkChannelName = [null], index = 0, type = source, parameters = map['topics' -> 'helloTopic', 'url' -> 'tcp://localhost:1883'], children = list[[empty]]]

2017-03-04T12:07:56+0530 1.3.0.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler - 流的部署状态 'mqtttestfile': DeploymentStatus{state=deployed}

with spring-xd 1.3.1 仍然问题未解决,这是我在日志中看到的错误信息-

2017-03-05T01:15:06+0530 1.3.1.RELEASE INFO LeaderSelector-1 zk.DeploymentSupervisor - 领导因线程中断而取消

2017-03-05T01:15:06+0530 1.3.1.RELEASE 错误 MQTT Rec: xd.mqtt.client.id.src inbound.MqttPahoMessageDrivenChannelAdapter - 丢失 connection:Connection 丢失;正在重试...

谢谢。

我刚刚测试了您的流,对我来说效果很好...

$ cat /tmp/xd/output/mqtttestfile.out 
foo
bar

(在我将消息 foo 和 bar 添加到队列之后)。

org.springframework.integration 启用 DEBUG 日志记录(在容器的 logback.grooy 文件中),我看到...

2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 endpoint.EventDrivenConsumer - started outbound.mqtttestfile.0
2017-03-04T08:02:59-0500 1.3.1.RELEASE DEBUG DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - Connected and subscribed to [helloTopic]
2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentsPathChildrenCache-0 inbound.MqttPahoMessageDrivenChannelAdapter - started mqttInbound
2017-03-04T08:02:59-0500 1.3.1.RELEASE INFO DeploymentSupervisor-0 zk.ZKStreamDeploymentHandler - Deployment status for stream 'mqtttestfile': DeploymentStatus{state=deployed}

奇怪的是您没有看到 ...started... 消息(信息)。

你能试试 1.3.1 版吗?