Spring XD 无法找到 RabbitMQ ConnectionFactory 的 class 定义

Spring XD unable to find class definition for RabbitMQ ConnectionFactory

我正在从 xd-singlenode 模式切换到分布式,使用版本 1.3.1-RELEASE

我在 docker 容器中 运行ning Spring XD,使用 Dockerfile similar to the one here. According to the XD guide,如果我想使用 RabbitMQ 作为我的数据传输,那么我需要配置 servers.yml 文件。我在一个单独的 docker 容器中有 RabbitMQ 运行ning,我 link 这两个容器一起作为 docker 组合的一部分。

简而言之,我已经配置了所有内容(我相信是正确的),但是当我在 Spring XD 中转到 运行 多个流时(这就是 RabbitMQ 发挥作用的时候,因为它会充当容器之间的消息传输)Spring XD 抛出以下异常:

java.lang.NoClassDefFoundError: org/springframework/amqp/rabbit/connection/ConnectionFactory
    at java.lang.Class.getDeclaredConstructors0(Native Method) ~[na:1.8.0_131]
    at java.lang.Class.privateGetDeclaredConstructors(Class.java:2671) ~[na:1.8.0_131]
    at java.lang.Class.getConstructors(Class.java:1651) ~[na:1.8.0_131]
    at org.springframework.boot.BeanDefinitionLoader.isComponent(BeanDefinitionLoader.java:276) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:158) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:135) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.BeanDefinitionLoader.load(BeanDefinitionLoader.java:127) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.SpringApplication.load(SpringApplication.java:615) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:314) ~[spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.boot.builder.SpringApplicationBuilder.run(SpringApplicationBuilder.java:139) [spring-boot-1.2.3.RELEASE.jar:1.2.3.RELEASE]
    at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusConfiguration.createApplicationContext(MessageBusConfiguration.java:86) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
    at org.springframework.xd.dirt.plugins.spark.streaming.MessageBusSender.start(MessageBusSender.java:105) [spring-xd-dirt-1.3.1.RELEASE.jar:1.3.1.RELEASE]
    at org.springframework.xd.spark.streaming.java.ModuleExecutor.call(ModuleExecutor.java:58) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
    at org.springframework.xd.spark.streaming.java.ModuleExecutor.call(ModuleExecutor.java:53) [spring-xd-spark-streaming-1.3.1.RELEASE.jar:1.3.1.RELEASE]
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition.apply(JavaRDDLike.scala:206) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:806) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:1498) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.scheduler.Task.run(Task.scala:64) [spark-core_2.10-1.3.1.jar:1.3.1]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) [spark-core_2.10-1.3.1.jar:1.3.1]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: java.lang.ClassNotFoundException: org.springframework.amqp.rabbit.connection.ConnectionFactory
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) ~[na:1.8.0_131]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[na:1.8.0_131]
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) ~[na:1.8.0_131]
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[na:1.8.0_131]
    ... 26 common frames omitted

那么,是什么原因呢? Spring XD 没有附带必要的 RabbitMQ 依赖项吗?

我什至尝试手动将 spring-messaging jar 添加到包含依赖项的 /opt/spring-xd/xd/lib 中,但没有成功。这是怎么回事?

编辑:我将放入我的 servers.yml 文件以获取额外的上下文。也许有人能看出我哪里做错了:

spring:
  profiles: container
xd:
  transport: rabbit
  messagebus:
    local:
      queueSize:                   2147483647
      polling:                     1000
      executor:
        corePoolSize:              0
        maxPoolSize:               200
        queueSize:                 2147483647
        keepAliveSeconds:          60
    rabbit:
      compressionLevel:            1
            # bus-level property, applies only when 'compress=true' for a stream module
            # See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
      longStringLimit:             8192
            # Headers longer than this will not be converted to String and will be a
            # DataInputStream - such headers will NOT be properly converted back on output.
      default:
        ackMode:                   AUTO
             # Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
             # Upper case only.
             # Note: MANUAL requires specialized code in the consuming module and is unlikely to be
             # used in an XD application. For more information, see
             # http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
        autoBindDLQ:               false
        backOffInitialInterval:    1000
        backOffMaxInterval:        10000
        backOffMultiplier:         2.0
        batchBufferLimit:          10000
        batchingEnabled:           false
        batchSize:                 100
        batchTimeout:              5000
        compress:                  false
        concurrency:               1
        deliveryMode:              PERSISTENT
        durableSubscription:       false
        maxAttempts:               3
        maxConcurrency:            1
        prefix:                    xdbus.
             # prefix for queue/exchange names so policies (ha, dle etc.) can be applied
        prefetch:                  1
        replyHeaderPatterns:       STANDARD_REPLY_HEADERS,*
        republishToDLQ:            false
             # When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
        requestHeaderPatterns:     STANDARD_REQUEST_HEADERS,*
        requeue:                   true
        transacted:                false
        txSize:                    1

#    redis:
#      headers:
            # comma-delimited list of additional header names to transport
#      default:
            # default bus properties, if not specified at the module level
#        backOffInitialInterval:    1000
#        backOffMaxInterval:        10000
#        backOffMultiplier:         2.0
#        concurrency:               1
#        maxAttempts:               3
    kafka:
      brokers:                                 kafka:9092
      zkAddress:                               zookeeper:2181
      mode:                                    embeddedHeaders
      offsetManagement:                        kafkaTopic
      headers:
             # comma-delimited list of additional header names to transport
      socketBufferSize:                        2097152
      offsetStoreTopic:                        SpringXdOffsets
      offsetStoreSegmentSize:                  25000000
      offsetStoreRetentionTime:                60000
      offsetStoreRequiredAcks:                 1
      offsetStoreMaxFetchSize:                 1048576
      offsetStoreBatchBytes:                   16384
      offsetStoreBatchTime:                    1000
      offsetUpdateTimeWindow:                  10000
      offsetUpdateCount:                       0
      offsetUpdateShutdownTimeout:             2000
      default:
        batchSize:                 16384
        batchTimeout:              0
        replicationFactor:         1
        concurrency:               1
        requiredAcks:              1
        compressionCodec:          none
        queueSize:                 8192 # must be a power of 2
        maxWait:                   100
        fetchSize:                 1048576
        minPartitionCount:         1
        durableSubscription:       false
        syncProducer:              false
        syncProducerTimeout:       5000
---
#Config for admin
spring:
  profiles: admin
xd:
  transport: rabbit
  messagebus:
    local:
      queueSize:                   2147483647
      polling:                     1000
      executor:
        corePoolSize:              0
        maxPoolSize:               200
        queueSize:                 2147483647
        keepAliveSeconds:          60
    rabbit:
      compressionLevel:            1
            # bus-level property, applies only when 'compress=true' for a stream module
            # See java.util.zip.Deflater; 1=BEST_SPEED, 9=BEST_COMPRESSION, ...
      longStringLimit:             8192
            # Headers longer than this will not be converted to String and will be a
            # DataInputStream - such headers will NOT be properly converted back on output.
      default:
        ackMode:                   AUTO
             # Valid: AUTO (container acks), NONE (broker acks), MANUAL (consumer acks).
             # Upper case only.
             # Note: MANUAL requires specialized code in the consuming module and is unlikely to be
             # used in an XD application. For more information, see
             # http://docs.spring.io/spring-integration/reference/html/amqp.html#amqp-inbound-ack
        autoBindDLQ:               false
        backOffInitialInterval:    1000
        backOffMaxInterval:        10000
        backOffMultiplier:         2.0
        batchBufferLimit:          10000
        batchingEnabled:           false
        batchSize:                 100
        batchTimeout:              5000
        compress:                  false
        concurrency:               1
        deliveryMode:              PERSISTENT
        durableSubscription:       false
        maxAttempts:               3
        maxConcurrency:            1
        prefix:                    xdbus.
             # prefix for queue/exchange names so policies (ha, dle etc.) can be applied
        prefetch:                  1
        replyHeaderPatterns:       STANDARD_REPLY_HEADERS,*
        republishToDLQ:            false
             # When false, normal rabbitmq dlq processing; when true, republish to the DLQ with stack trace
        requestHeaderPatterns:     STANDARD_REQUEST_HEADERS,*
        requeue:                   true
        transacted:                false
        txSize:                    1

#    redis:
#      headers:
            # comma-delimited list of additional header names to transport
#      default:
            # default bus properties, if not specified at the module level
#        backOffInitialInterval:    1000
#        backOffMaxInterval:        10000
#        backOffMultiplier:         2.0
#        concurrency:               1
#        maxAttempts:               3
    kafka:
      brokers:                                 kafka:9092
      zkAddress:                               zookeeper:2181
      mode:                                    embeddedHeaders
      offsetManagement:                        kafkaTopic
      headers:
             # comma-delimited list of additional header names to transport
      socketBufferSize:                        2097152
      offsetStoreTopic:                        SpringXdOffsets
      offsetStoreSegmentSize:                  25000000
      offsetStoreRetentionTime:                60000
      offsetStoreRequiredAcks:                 1
      offsetStoreMaxFetchSize:                 1048576
      offsetStoreBatchBytes:                   16384
      offsetStoreBatchTime:                    1000
      offsetUpdateTimeWindow:                  10000
      offsetUpdateCount:                       0
      offsetUpdateShutdownTimeout:             2000
      default:
        batchSize:                 16384
        batchTimeout:              0
        replicationFactor:         1
        concurrency:               1
        requiredAcks:              1
        compressionCodec:          none
        queueSize:                 8192 # must be a power of 2
        maxWait:                   100
        fetchSize:                 1048576
        minPartitionCount:         1
        durableSubscription:       false
        syncProducer:              false
        syncProducerTimeout:       5000

---
#  Rabbit MQ Properties
#
#  NOTE: sslProperties is mutually exclusive with keyStore, keyStorePassphrase, trustStore, trustStorePassphrase.
#  if you set inline properties, values in the properties file location given by 'sslProperties' will be ignored.
#
spring:
  rabbitmq:
   addresses: my-domain:5672
   adminAddresses: http://my-domain:15672
   nodes: rabbit@my-domain
   username: myusername
   password: mypassword
   virtual_host: /
   useSSL: false
   sslProperties:
   ssl:
     keyStore:
     keyStorePassphrase:
     trustStore:
     trustStorePassphrase:

编辑 2:看起来 xd-adminxd-container 脚本 do not add the messagebus jars$CLASSPATHAdminServerApplication 或 [=24= 开始时].我不确定这是否是罪魁祸首。

根据您选择的消息总线,所需的 jar 位于 xd/lib 下的子文件夹中...

../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/amqp-client-3.6.0.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/http-client-1.0.0.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-amqp-1.5.4.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-integration-amqp-4.2.5.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-rabbit-1.5.4.RELEASE.jar
../../Downloads/spring-xd-1.3.1.RELEASE/xd/lib/messagebus/rabbit/spring-xd-messagebus-rabbit-1.3.1.RELEASE.jar

运行时在解析配置时将该目录添加到类路径中。

xd-adminxd-container do not add any of the messagebus jars to the classpath 的 Spring XD 启动脚本在启动各自的应用程序之前。他们只添加直接位于 /lib 中的罐子和 lib/hadoop27/ 中的 hadoop 罐子。

我能够通过修改脚本以在类路径中包含必要的 jar 来解决此问题:

RABBIT_LIB=$APP_HOME/lib/messagebus/rabbit
if [ -d "$RABBIT_LIB" ]; then
  for k in "$RABBIT_LIB"/*.jar; do
    CLASSPATH="$CLASSPATH":"$k"
  done
fi