在 springxd 中创建 spark 处理器模块时出错:应存在 java 或 scala 模块
Error while creating spark processor module in springxd : Either java or scala module should be present
我正在尝试在 springxd 1.2 中创建自定义 spark 处理器,但在创建模块时出现以下错误。
java.lang.IllegalStateException: Either java or scala module should be present
指的是哪个模块。是关于非导入库的吗?
我的模块定义是:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<int:channel id="input"/>
<int:channel id="output"/>
<int:transformer input-channel="input" output-channel="output">
<bean class="spark.MetricKeyAggregator"/>
</int:transformer>
我的 pom 是这个:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-module-parent</artifactId>
<version>1.2.0.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>metric-aggregator-spark</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>es.onebox</groupId>
<artifactId>metric-aggregator-domain</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-spark-streaming</artifactId>
<version>1.2.0.RELEASE</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>spring-io-release</id>
<url>http://repo.spring.io/release</url>
</repository>
<repository>
<id>spring-io-snapshot</id>
<url>https://repo.spring.io/libs-snapshot</url>
</repository>
<repository>
<id>paho-releases</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases</url>
</repository>
</repositories>
堆栈跟踪如下:
DeploymentStatus{state=failed,error(s)=java.lang.IllegalStateException: Either java or scala module should be present.
at org.springframework.xd.dirt.plugins.spark.streaming.SparkStreamingPlugin.postProcessModule(SparkStreamingPlugin.java:145)
at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:509)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:503)
at org.apache.curator.framework.listen.ListenerContainer.run(ListenerContainer.java:92)
at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.run(PathChildrenCache.java:762)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
您的模块中没有 Processor
。
您的 MetricKeyAggregator
需要实现 Processor
并简单地定义为一个 bean;您不需要 in/out 个频道或转换器。
有关示例,请参阅 this test。
我正在尝试在 springxd 1.2 中创建自定义 spark 处理器,但在创建模块时出现以下错误。
java.lang.IllegalStateException: Either java or scala module should be present
指的是哪个模块。是关于非导入库的吗?
我的模块定义是:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<int:channel id="input"/>
<int:channel id="output"/>
<int:transformer input-channel="input" output-channel="output">
<bean class="spark.MetricKeyAggregator"/>
</int:transformer>
我的 pom 是这个:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-module-parent</artifactId>
<version>1.2.0.RELEASE</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>metric-aggregator-spark</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>es.onebox</groupId>
<artifactId>metric-aggregator-domain</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.springframework.xd</groupId>
<artifactId>spring-xd-spark-streaming</artifactId>
<version>1.2.0.RELEASE</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>spring-io-release</id>
<url>http://repo.spring.io/release</url>
</repository>
<repository>
<id>spring-io-snapshot</id>
<url>https://repo.spring.io/libs-snapshot</url>
</repository>
<repository>
<id>paho-releases</id>
<url>https://repo.eclipse.org/content/repositories/paho-releases</url>
</repository>
</repositories>
堆栈跟踪如下:
DeploymentStatus{state=failed,error(s)=java.lang.IllegalStateException: Either java or scala module should be present.
at org.springframework.xd.dirt.plugins.spark.streaming.SparkStreamingPlugin.postProcessModule(SparkStreamingPlugin.java:145)
at org.springframework.xd.dirt.module.ModuleDeployer.postProcessModule(ModuleDeployer.java:238)
at org.springframework.xd.dirt.module.ModuleDeployer.doDeploy(ModuleDeployer.java:218)
at org.springframework.xd.dirt.module.ModuleDeployer.deploy(ModuleDeployer.java:200)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployModule(DeploymentListener.java:365)
at org.springframework.xd.dirt.server.container.DeploymentListener.deployStreamModule(DeploymentListener.java:334)
at org.springframework.xd.dirt.server.container.DeploymentListener.onChildAdded(DeploymentListener.java:181)
at org.springframework.xd.dirt.server.container.DeploymentListener.childEvent(DeploymentListener.java:149)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:509)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.apply(PathChildrenCache.java:503)
at org.apache.curator.framework.listen.ListenerContainer.run(ListenerContainer.java:92)
at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
at org.apache.curator.framework.listen.ListenerContainer.forEach(ListenerContainer.java:83)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.callListeners(PathChildrenCache.java:500)
at org.apache.curator.framework.recipes.cache.EventOperation.invoke(EventOperation.java:35)
at org.apache.curator.framework.recipes.cache.PathChildrenCache.run(PathChildrenCache.java:762)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
您的模块中没有 Processor
。
您的 MetricKeyAggregator
需要实现 Processor
并简单地定义为一个 bean;您不需要 in/out 个频道或转换器。
有关示例,请参阅 this test。