在 springxd 中创建 spark 处理器模块时出错:应存在 java 或 scala 模块

Error while creating spark processor module in springxd : Either java or scala module should be present

我正在尝试在 springxd 1.2 中创建自定义 spark 处理器,但在创建模块时出现以下错误。

java.lang.IllegalStateException: Either java or scala module should be present

指的是哪个模块。是关于非导入库的吗?

我的模块定义是:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:int="http://www.springframework.org/schema/integration"
   xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">


<int:channel id="input"/>
<int:channel id="output"/>
<int:transformer input-channel="input" output-channel="output">
    <bean class="spark.MetricKeyAggregator"/>
</int:transformer>

我的 pom 是这个:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
    <groupId>org.springframework.xd</groupId>
    <artifactId>spring-xd-module-parent</artifactId>
    <version>1.2.0.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>metric-aggregator-spark</artifactId>

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>es.onebox</groupId>
        <artifactId>metric-aggregator-domain</artifactId>
        <version>1.0-SNAPSHOT</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.2.1</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.xd</groupId>
        <artifactId>spring-xd-spark-streaming</artifactId>
        <version>1.2.0.RELEASE</version>
    </dependency>
</dependencies>

<repositories>
    <repository>
        <id>spring-io-release</id>
        <url>http://repo.spring.io/release</url>
    </repository>
    <repository>
        <id>spring-io-snapshot</id>
        <url>https://repo.spring.io/libs-snapshot</url>
    </repository>
    <repository>
        <id>paho-releases</id>
        <url>https://repo.eclipse.org/content/repositories/paho-releases</url>
    </repository>
</repositories>

堆栈跟踪如下:

DeploymentStatus{state=failed,error(s)=java.lang.IllegalStateException: Either java or scala module should be present.
at org.springframework.xd.dirt.plugins.spark.streaming.SparkStreamingPlugin.postProcessModule(SparkStreamingPlugin.java:145)
at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:509)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:503)
at org.apache.curator.framework.listen.ListenerContainer.run(ListenerContainer.java:92)
at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.run(PathChildrenCache.java:762)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

您的模块中没有 Processor

您的 MetricKeyAggregator 需要实现 Processor 并简单地定义为一个 bean;您不需要 in/out 个频道或转换器。

有关示例,请参阅 this test