以Kafka作为消息总线直接绑定部署

Direct binding deployment with Kafka as message bus

我正在尝试对以下流使用直接绑定。

stream create --definition "time | log" --name ticktock
stream deploy ticktock --properties module.*.count=0

部署失败,管理节点和容器节点均出现此异常:

java.lang.IllegalArgumentException: Module count cannot be zero
    at org.springframework.xd.dirt.integration.kafka.KafkaMessageBus$KafkaPropertiesAccessor.getNumberOfKafkaPartitionsForProducer(KafkaMessageBus.java:799)
    at org.springframework.xd.dirt.integration.kafka.KafkaMessageBus.bindProducer(KafkaMessageBus.java:500)
    at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindMessageProducer(AbstractMessageBusBinderPlugin.java:287)
    at org.springframework.xd.dirt.plugins.AbstractMessageBusBinderPlugin.bindConsumerAndProducers(AbstractMessageBusBinderPlugin.java:143)
    at org.springframework.xd.dirt.plugins.stream.StreamPlugin.postProcessModule(StreamPlugin.java:73)
    at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
    at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
    at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
    at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
    at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
    at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
    at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:509)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:503)
    at org.apache.curator.framework.listen.ListenerContainer.run(ListenerContainer.java:92)
    at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
    at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
    at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
    at org.apache.curator.framework.recipes.cache.PathChildrenCache.run(PathChildrenCache.java:762)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

我有一个 Spring-XD(1.2.0) 集群,其中有一个管理员和两个容器节点,使用 Kafka 作为消息总线。

我是不是做错了什么?还是直接绑定和Kafka消息总线有问题?

根据 the documentation,XD KafkaMessageBus 当前不支持直接绑定...

NOTE: The Kafka message bus does not support count=0 for module deployments, and therefore, it does not support direct binding of modules. This feature will be available in a future release. In the meantime, if direct communication between modules is necessary for Kafka deployments, composite modules should be used instead.