Spring XD 无法找到 RabbitMQ ConnectionFactory 的 class 定义
Spring XD unable to find class definition for RabbitMQ ConnectionFactory
我正在从 xd-singlenode
模式切换到分布式,使用版本 1.3.1-RELEASE
。
我在 docker 容器中 运行ning Spring XD,使用 Dockerfile similar to the one here. According to the XD guide,如果我想使用 RabbitMQ 作为我的数据传输,那么我需要配置 servers.yml
文件。我在一个单独的 docker 容器中有 RabbitMQ 运行ning,我 link 这两个容器一起作为 docker 组合的一部分。
简而言之,我已经配置了所有内容(我相信是正确的),但是当我在 Spring XD 中转到 运行 多个流时(这就是 RabbitMQ 发挥作用的时候,因为它会充当容器之间的消息传输)Spring XD 抛出以下异常:
java.lang.NoClassDefFoundError: org/springframework/amqp/rabbit/connection/ConnectionFactory
at java.lang.Class.getDeclaredConstructors0(Native Method) ~[na:1.8.0_131]
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) ~[na:1.8.0_131]
at java.lang.Class.getConstructors(Class.java:1651) ~[na:1.8.0_131]
at org.springframework.boot.BeanDefinitionLoader.isComponent(BeanDefinitionLoader.java:276) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:158) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:135) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:127) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.SpringApplication.load(SpringApplication.java:615) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:139) [spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusConfiguration.createApplicationContext(MessageBusConfiguration.java:86) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusSender.start(MessageBusSender.java:105) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.spark.streaming.java.ModuleExecutor.call(ModuleExecutor.java:58) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.spark.streaming.java.ModuleExecutor.call(ModuleExecutor.java:53) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.scheduler.Task.run(Task.scala:64) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) [spark-core_2.10-1.3.1.jar:1.3.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.lang.ClassNotFoundException: org.springframework.amqp.rabbit.connection.ConnectionFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_131]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_131]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) ~[na:1.8.0_131]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_131]
... 26 common frames omitted
那么,是什么原因呢? Spring XD 没有附带必要的 RabbitMQ 依赖项吗?
我什至尝试手动将 spring-messaging
jar 添加到包含依赖项的 /opt/spring-xd/xd/lib
中,但没有成功。这是怎么回事?
编辑:我将放入我的 servers.yml
文件以获取额外的上下文。也许有人能看出我哪里做错了:
spring:
profiles: container
xd:
transport: rabbit
messagebus:
local:
queueSize: 2147483647
polling: 1000
executor:
corePoolSize: 0
maxPoolSize: 200
queueSize: 2147483647
keepAliveSeconds: 60
rabbit:
compressionLevel: 1
# bus-level property, applies only when 'compress=true' for a stream module
# See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
longStringLimit: 8192
# Headers longer than this will not be converted to String and will be a
# DataInputStream - such headers will NOT be properly converted back on output.
default:
ackMode: AUTO
# Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
# Upper case only.
# Note: MANUAL requires specialized code in the consuming module and is unlikely to be
# used in an XD application. For more information, see
# http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
autoBindDLQ: false
backOffInitialInterval: 1000
backOffMaxInterval: 10000
backOffMultiplier: 2.0
batchBufferLimit: 10000
batchingEnabled: false
batchSize: 100
batchTimeout: 5000
compress: false
concurrency: 1
deliveryMode: PERSISTENT
durableSubscription: false
maxAttempts: 3
maxConcurrency: 1
prefix: xdbus.
# prefix for queue/exchange names so policies (ha, dle etc.) can be applied
prefetch: 1
replyHeaderPatterns: STANDARD_REPLY_HEADERS,*
republishToDLQ: false
# When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
requestHeaderPatterns: STANDARD_REQUEST_HEADERS,*
requeue: true
transacted: false
txSize: 1
# redis:
# headers:
# comma-delimited list of additional header names to transport
# default:
# default bus properties, if not specified at the module level
# backOffInitialInterval: 1000
# backOffMaxInterval: 10000
# backOffMultiplier: 2.0
# concurrency: 1
# maxAttempts: 3
kafka:
brokers: kafka:9092
zkAddress: zookeeper:2181
mode: embeddedHeaders
offsetManagement: kafkaTopic
headers:
# comma-delimited list of additional header names to transport
socketBufferSize: 2097152
offsetStoreTopic: SpringXdOffsets
offsetStoreSegmentSize: 25000000
offsetStoreRetentionTime: 60000
offsetStoreRequiredAcks: 1
offsetStoreMaxFetchSize: 1048576
offsetStoreBatchBytes: 16384
offsetStoreBatchTime: 1000
offsetUpdateTimeWindow: 10000
offsetUpdateCount: 0
offsetUpdateShutdownTimeout: 2000
default:
batchSize: 16384
batchTimeout: 0
replicationFactor: 1
concurrency: 1
requiredAcks: 1
compressionCodec: none
queueSize: 8192 # must be a power of 2
maxWait: 100
fetchSize: 1048576
minPartitionCount: 1
durableSubscription: false
syncProducer: false
syncProducerTimeout: 5000
---
#Config for admin
spring:
profiles: admin
xd:
transport: rabbit
messagebus:
local:
queueSize: 2147483647
polling: 1000
executor:
corePoolSize: 0
maxPoolSize: 200
queueSize: 2147483647
keepAliveSeconds: 60
rabbit:
compressionLevel: 1
# bus-level property, applies only when 'compress=true' for a stream module
# See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
longStringLimit: 8192
# Headers longer than this will not be converted to String and will be a
# DataInputStream - such headers will NOT be properly converted back on output.
default:
ackMode: AUTO
# Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
# Upper case only.
# Note: MANUAL requires specialized code in the consuming module and is unlikely to be
# used in an XD application. For more information, see
# http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
autoBindDLQ: false
backOffInitialInterval: 1000
backOffMaxInterval: 10000
backOffMultiplier: 2.0
batchBufferLimit: 10000
batchingEnabled: false
batchSize: 100
batchTimeout: 5000
compress: false
concurrency: 1
deliveryMode: PERSISTENT
durableSubscription: false
maxAttempts: 3
maxConcurrency: 1
prefix: xdbus.
# prefix for queue/exchange names so policies (ha, dle etc.) can be applied
prefetch: 1
replyHeaderPatterns: STANDARD_REPLY_HEADERS,*
republishToDLQ: false
# When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
requestHeaderPatterns: STANDARD_REQUEST_HEADERS,*
requeue: true
transacted: false
txSize: 1
# redis:
# headers:
# comma-delimited list of additional header names to transport
# default:
# default bus properties, if not specified at the module level
# backOffInitialInterval: 1000
# backOffMaxInterval: 10000
# backOffMultiplier: 2.0
# concurrency: 1
# maxAttempts: 3
kafka:
brokers: kafka:9092
zkAddress: zookeeper:2181
mode: embeddedHeaders
offsetManagement: kafkaTopic
headers:
# comma-delimited list of additional header names to transport
socketBufferSize: 2097152
offsetStoreTopic: SpringXdOffsets
offsetStoreSegmentSize: 25000000
offsetStoreRetentionTime: 60000
offsetStoreRequiredAcks: 1
offsetStoreMaxFetchSize: 1048576
offsetStoreBatchBytes: 16384
offsetStoreBatchTime: 1000
offsetUpdateTimeWindow: 10000
offsetUpdateCount: 0
offsetUpdateShutdownTimeout: 2000
default:
batchSize: 16384
batchTimeout: 0
replicationFactor: 1
concurrency: 1
requiredAcks: 1
compressionCodec: none
queueSize: 8192 # must be a power of 2
maxWait: 100
fetchSize: 1048576
minPartitionCount: 1
durableSubscription: false
syncProducer: false
syncProducerTimeout: 5000
---
# Rabbit MQ Properties
#
# NOTE: sslProperties is mutually exclusive with keyStore, keyStorePassphrase, trustStore, trustStorePassphrase.
# if you set inline properties, values in the properties file location given by 'sslProperties' will be ignored.
#
spring:
rabbitmq:
addresses: my-domain:5672
adminAddresses: http://my-domain:15672
nodes: rabbit@my-domain
username: myusername
password: mypassword
virtual_host: /
useSSL: false
sslProperties:
ssl:
keyStore:
keyStorePassphrase:
trustStore:
trustStorePassphrase:
编辑 2:看起来 xd-admin
和 xd-container
脚本 do not add the messagebus jars 到 $CLASSPATH
在 AdminServerApplication
或 [=24= 开始时].我不确定这是否是罪魁祸首。
根据您选择的消息总线,所需的 jar 位于 xd/lib 下的子文件夹中...
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/amqp-client-3.6.0.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/http-client-1.0.0.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-amqp-1.5.4.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-integration-amqp-4.2.5.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-rabbit-1.5.4.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-xd-messagebus-rabbit-1.3.1.RELEASE.jar
运行时在解析配置时将该目录添加到类路径中。
xd-admin
和 xd-container
do not add any of the messagebus
jars to the classpath 的 Spring XD 启动脚本在启动各自的应用程序之前。他们只添加直接位于 /lib
中的罐子和 lib/hadoop27/
中的 hadoop 罐子。
我能够通过修改脚本以在类路径中包含必要的 jar 来解决此问题:
RABBIT_LIB=$APP_HOME/lib/messagebus/rabbit
if [ -d "$RABBIT_LIB" ]; then
for k in "$RABBIT_LIB"/*.jar; do
CLASSPATH="$CLASSPATH":"$k"
done
fi
我正在从 xd-singlenode
模式切换到分布式,使用版本 1.3.1-RELEASE
。
我在 docker 容器中 运行ning Spring XD,使用 Dockerfile similar to the one here. According to the XD guide,如果我想使用 RabbitMQ 作为我的数据传输,那么我需要配置 servers.yml
文件。我在一个单独的 docker 容器中有 RabbitMQ 运行ning,我 link 这两个容器一起作为 docker 组合的一部分。
简而言之,我已经配置了所有内容(我相信是正确的),但是当我在 Spring XD 中转到 运行 多个流时(这就是 RabbitMQ 发挥作用的时候,因为它会充当容器之间的消息传输)Spring XD 抛出以下异常:
java.lang.NoClassDefFoundError: org/springframework/amqp/rabbit/connection/ConnectionFactory
at java.lang.Class.getDeclaredConstructors0(Native Method) ~[na:1.8.0_131]
at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) ~[na:1.8.0_131]
at java.lang.Class.getConstructors(Class.java:1651) ~[na:1.8.0_131]
at org.springframework.boot.BeanDefinitionLoader.isComponent(BeanDefinitionLoader.java:276) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:158) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:135) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:127) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.SpringApplication.load(SpringApplication.java:615) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:139) [spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusConfiguration.createApplicationContext(MessageBusConfiguration.java:86) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusSender.start(MessageBusSender.java:105) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.spark.streaming.java.ModuleExecutor.call(ModuleExecutor.java:58) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.springframework.xd.spark.streaming.java.ModuleExecutor.call(ModuleExecutor.java:53) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.scheduler.Task.run(Task.scala:64) [spark-core_2.10-1.3.1.jar:1.3.1]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) [spark-core_2.10-1.3.1.jar:1.3.1]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.lang.ClassNotFoundException: org.springframework.amqp.rabbit.connection.ConnectionFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_131]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_131]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) ~[na:1.8.0_131]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_131]
... 26 common frames omitted
那么,是什么原因呢? Spring XD 没有附带必要的 RabbitMQ 依赖项吗?
我什至尝试手动将 spring-messaging
jar 添加到包含依赖项的 /opt/spring-xd/xd/lib
中,但没有成功。这是怎么回事?
编辑:我将放入我的 servers.yml
文件以获取额外的上下文。也许有人能看出我哪里做错了:
spring:
profiles: container
xd:
transport: rabbit
messagebus:
local:
queueSize: 2147483647
polling: 1000
executor:
corePoolSize: 0
maxPoolSize: 200
queueSize: 2147483647
keepAliveSeconds: 60
rabbit:
compressionLevel: 1
# bus-level property, applies only when 'compress=true' for a stream module
# See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
longStringLimit: 8192
# Headers longer than this will not be converted to String and will be a
# DataInputStream - such headers will NOT be properly converted back on output.
default:
ackMode: AUTO
# Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
# Upper case only.
# Note: MANUAL requires specialized code in the consuming module and is unlikely to be
# used in an XD application. For more information, see
# http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
autoBindDLQ: false
backOffInitialInterval: 1000
backOffMaxInterval: 10000
backOffMultiplier: 2.0
batchBufferLimit: 10000
batchingEnabled: false
batchSize: 100
batchTimeout: 5000
compress: false
concurrency: 1
deliveryMode: PERSISTENT
durableSubscription: false
maxAttempts: 3
maxConcurrency: 1
prefix: xdbus.
# prefix for queue/exchange names so policies (ha, dle etc.) can be applied
prefetch: 1
replyHeaderPatterns: STANDARD_REPLY_HEADERS,*
republishToDLQ: false
# When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
requestHeaderPatterns: STANDARD_REQUEST_HEADERS,*
requeue: true
transacted: false
txSize: 1
# redis:
# headers:
# comma-delimited list of additional header names to transport
# default:
# default bus properties, if not specified at the module level
# backOffInitialInterval: 1000
# backOffMaxInterval: 10000
# backOffMultiplier: 2.0
# concurrency: 1
# maxAttempts: 3
kafka:
brokers: kafka:9092
zkAddress: zookeeper:2181
mode: embeddedHeaders
offsetManagement: kafkaTopic
headers:
# comma-delimited list of additional header names to transport
socketBufferSize: 2097152
offsetStoreTopic: SpringXdOffsets
offsetStoreSegmentSize: 25000000
offsetStoreRetentionTime: 60000
offsetStoreRequiredAcks: 1
offsetStoreMaxFetchSize: 1048576
offsetStoreBatchBytes: 16384
offsetStoreBatchTime: 1000
offsetUpdateTimeWindow: 10000
offsetUpdateCount: 0
offsetUpdateShutdownTimeout: 2000
default:
batchSize: 16384
batchTimeout: 0
replicationFactor: 1
concurrency: 1
requiredAcks: 1
compressionCodec: none
queueSize: 8192 # must be a power of 2
maxWait: 100
fetchSize: 1048576
minPartitionCount: 1
durableSubscription: false
syncProducer: false
syncProducerTimeout: 5000
---
#Config for admin
spring:
profiles: admin
xd:
transport: rabbit
messagebus:
local:
queueSize: 2147483647
polling: 1000
executor:
corePoolSize: 0
maxPoolSize: 200
queueSize: 2147483647
keepAliveSeconds: 60
rabbit:
compressionLevel: 1
# bus-level property, applies only when 'compress=true' for a stream module
# See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
longStringLimit: 8192
# Headers longer than this will not be converted to String and will be a
# DataInputStream - such headers will NOT be properly converted back on output.
default:
ackMode: AUTO
# Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
# Upper case only.
# Note: MANUAL requires specialized code in the consuming module and is unlikely to be
# used in an XD application. For more information, see
# http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
autoBindDLQ: false
backOffInitialInterval: 1000
backOffMaxInterval: 10000
backOffMultiplier: 2.0
batchBufferLimit: 10000
batchingEnabled: false
batchSize: 100
batchTimeout: 5000
compress: false
concurrency: 1
deliveryMode: PERSISTENT
durableSubscription: false
maxAttempts: 3
maxConcurrency: 1
prefix: xdbus.
# prefix for queue/exchange names so policies (ha, dle etc.) can be applied
prefetch: 1
replyHeaderPatterns: STANDARD_REPLY_HEADERS,*
republishToDLQ: false
# When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
requestHeaderPatterns: STANDARD_REQUEST_HEADERS,*
requeue: true
transacted: false
txSize: 1
# redis:
# headers:
# comma-delimited list of additional header names to transport
# default:
# default bus properties, if not specified at the module level
# backOffInitialInterval: 1000
# backOffMaxInterval: 10000
# backOffMultiplier: 2.0
# concurrency: 1
# maxAttempts: 3
kafka:
brokers: kafka:9092
zkAddress: zookeeper:2181
mode: embeddedHeaders
offsetManagement: kafkaTopic
headers:
# comma-delimited list of additional header names to transport
socketBufferSize: 2097152
offsetStoreTopic: SpringXdOffsets
offsetStoreSegmentSize: 25000000
offsetStoreRetentionTime: 60000
offsetStoreRequiredAcks: 1
offsetStoreMaxFetchSize: 1048576
offsetStoreBatchBytes: 16384
offsetStoreBatchTime: 1000
offsetUpdateTimeWindow: 10000
offsetUpdateCount: 0
offsetUpdateShutdownTimeout: 2000
default:
batchSize: 16384
batchTimeout: 0
replicationFactor: 1
concurrency: 1
requiredAcks: 1
compressionCodec: none
queueSize: 8192 # must be a power of 2
maxWait: 100
fetchSize: 1048576
minPartitionCount: 1
durableSubscription: false
syncProducer: false
syncProducerTimeout: 5000
---
# Rabbit MQ Properties
#
# NOTE: sslProperties is mutually exclusive with keyStore, keyStorePassphrase, trustStore, trustStorePassphrase.
# if you set inline properties, values in the properties file location given by 'sslProperties' will be ignored.
#
spring:
rabbitmq:
addresses: my-domain:5672
adminAddresses: http://my-domain:15672
nodes: rabbit@my-domain
username: myusername
password: mypassword
virtual_host: /
useSSL: false
sslProperties:
ssl:
keyStore:
keyStorePassphrase:
trustStore:
trustStorePassphrase:
编辑 2:看起来 xd-admin
和 xd-container
脚本 do not add the messagebus jars 到 $CLASSPATH
在 AdminServerApplication
或 [=24= 开始时].我不确定这是否是罪魁祸首。
根据您选择的消息总线,所需的 jar 位于 xd/lib 下的子文件夹中...
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/amqp-client-3.6.0.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/http-client-1.0.0.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-amqp-1.5.4.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-integration-amqp-4.2.5.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-rabbit-1.5.4.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-xd-messagebus-rabbit-1.3.1.RELEASE.jar
运行时在解析配置时将该目录添加到类路径中。
xd-admin
和 xd-container
do not add any of the messagebus
jars to the classpath 的 Spring XD 启动脚本在启动各自的应用程序之前。他们只添加直接位于 /lib
中的罐子和 lib/hadoop27/
中的 hadoop 罐子。
我能够通过修改脚本以在类路径中包含必要的 jar 来解决此问题:
RABBIT_LIB=$APP_HOME/lib/messagebus/rabbit
if [ -d "$RABBIT_LIB" ]; then
for k in "$RABBIT_LIB"/*.jar; do
CLASSPATH="$CLASSPATH":"$k"
done
fi