嵌入式 AMQP Java 代理

Embedded AMQP Java Broker

我正在尝试为连接到 RabbitMQ 代理的 Scala / Java 应用程序创建集成测试。为了实现这一点,我想要一个嵌入式代理,它会在每次测试之前启动和停止说 AMQP。最初我试图将 ActiveMQ 作为带有 AMQP 的嵌入式代理引入,但是该应用程序使用 RabbitMQ,因此只能使用 AMQP 0.9.3 版,而 ActiveMQ 需要 AMQP 1.0 版。

我可以使用其他嵌入式代理来代替 ActiveMQ 吗?

我不知道有任何嵌入式 RabbitMQ 服务器,所以我认为您有几个解决方法:

  1. 您的 RabbitMQ 服务器不需要存在于您的 CI 服务器上,您可以启动一个新服务器,即您的 CI rabbitmq 服务器。如果您不能自己提出一个,您可以查看 CloudAMQP。今天的免费套餐提供:每月 100 万条消息、20 个并发连接、100 个队列、10,000 条排队消息。对于您的 CI 过程来说可能足够了。

  2. 如果您的测试仅针对 RabbitMQ 进行单元测试,您可以模拟 RabbitMQ 消息生成。这就是我们在一些单元测试中所做的。我们只是检查某个操作是否使方法调用产生特定消息,但我们对此进行了模拟,因此我们实际上并没有发布消息。然后我们通过使用我们创建的特定消息显式调用消费者方法来测试每个消费者。

你可以试试Apache QPid Java broker。这可以用作嵌入式代理。

另一个 SO 问题描述了 Scala 中的设置 - Example of standalone Apache Qpid (amqp) Junit Test

我围绕下载、提取、启动和管理 RabbitMQ 的过程开发了一个包装器,因此它可以像任何 JVM 项目控制的嵌入式服务一样工作。

查看:https://github.com/AlejandroRivera/embedded-rabbitmq

就这么简单:

EmbeddedRabbitMqConfig config = new EmbeddedRabbitMqConfig.Builder()
    .version(PredefinedVersion.V3_5_7)
    .build();
EmbeddedRabbitMq rabbitMq = new EmbeddedRabbitMq(config);
rabbitMq.start();
...
rabbitMq.stop();

适用于 Linux、Mac 和 Windows。

一个完全在内存中的解决方案。根据需要替换 spring.* 属性。

<dependency>
  <groupId>org.apache.qpid</groupId>
  <artifactId>qpid-broker</artifactId>
  <version>6.1.1</version>
  <scope>test</scope>
</dependency>
public class EmbeddedBroker {
  public void start() {
    Broker broker = new Broker();
    BrokerOptions brokerOptions = new BrokerOptions();
    brokerOptions.setConfigProperty("qpid.amqp_port", environment.getProperty("spring.rabbitmq.port"));
    brokerOptions.setConfigProperty("qpid.broker.defaultPreferenceStoreAttributes", "{\"type\": \"Noop\"}");
    brokerOptions.setConfigProperty("qpid.vhost", environment.getProperty("spring.rabbitmq.virtual-host"));
    brokerOptions.setConfigurationStoreType("Memory");
    brokerOptions.setStartupLoggedToSystemOut(false);
    broker.startup(brokerOptions);
  }
}

initial-config.json 添加为资源:

{
  "name": "Embedded Test Broker",
  "modelVersion": "6.1",
  "authenticationproviders" : [{
    "name": "password",
    "type": "Plain",
    "secureOnlyMechanisms": [],
    "users": [{"name": "guest", "password": "guest", "type": "managed"}]
  }],
  "ports": [{
    "name": "AMQP",
    "port": "${qpid.amqp_port}",
    "authenticationProvider": "password",
    "protocols": [ "AMQP_0_9_1" ],
    "transports": [ "TCP" ],
    "virtualhostaliases": [{
      "name": "${qpid.vhost}",
      "type": "nameAlias"
    }]
  }],
  "virtualhostnodes" : [{
    "name": "${qpid.vhost}",
    "type": "Memory",
    "virtualHostInitialConfiguration": "{ \"type\": \"Memory\" }"
  }]
}

这里是OrangeDog提出的适配Qpid Broker的方案7.x,灵感来自here

添加 qpid 7.x 作为测试依赖项。在 7.x 中,这些已在核心 + 插件中分开,具体取决于您的需要。对于 RabbitMQ AMQP 版本,您需要 qpid-broker-plugins-amqp-0-8-protocol,对于 运行 in-memory(足以进行集成测试),请使用 qpid-broker-plugins-memory-store.

pom.xml:

...
<properties>
    ...
    <qpid-broker.version>7.0.2</qpid-broker.version>
</properties>

<dependencies>
    ...
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-broker-core</artifactId>
        <version>${qpid-broker.version}</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-broker-plugins-amqp-0-8-protocol</artifactId>
        <version>${qpid-broker.version}</version>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.qpid</groupId>
        <artifactId>qpid-broker-plugins-memory-store</artifactId>
        <version>${qpid-broker.version}</version>
        <scope>test</scope>
    </dependency>
</dependecies>
...

添加带有 hard-coded user/password 和默认 in-memory 虚拟主机映射到默认端口 (5672) 的代理配置:

qpid-config.json:

{
  "name": "EmbeddedBroker",
  "modelVersion": "7.0",
  "authenticationproviders": [
    {
      "name": "password",
      "type": "Plain",
      "secureOnlyMechanisms": [],
      "users": [{"name": "guest", "password": "guest", "type": "managed"}]
    }
  ],
  "ports": [
    {
      "name": "AMQP",
      "port": "${qpid.amqp_port}",
      "authenticationProvider": "password",
      "virtualhostaliases": [
        {
          "name": "defaultAlias",
          "type": "defaultAlias"
        }
      ]
    }
  ],
  "virtualhostnodes": [
    {
      "name": "default",
      "defaultVirtualHostNode": "true",
      "type": "Memory",
      "virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"
    }
  ]
}

定义 junit ExternalResource 并声明为 ClassRule(或在您的 IT @BeforeClass@AfterClass 方法中启动和关闭嵌入式代理):

EmbeddedAMQPBroker.java:

public class EmbeddedAMQPBroker extends ExternalResource {

    private final SystemLauncher broker = new SystemLauncher();

    @Override
    protected void before() throws Throwable {
        startQpidBroker();
        //createExchange();
    }

    @Override
    protected void after() {
        broker.shutdown();
    }

    private void startQpidBroker() throws Exception {
        Map<String, Object> attributes = new HashMap<>();
        attributes.put("type", "Memory");
        attributes.put("initialConfigurationLocation", findResourcePath("qpid-config.json"));
        broker.startup(attributes);
    }

    private String findResourcePath(final String fileName) {
        return EmbeddedAMQPBroker.class.getClassLoader().getResource(fileName).toExternalForm();
    }
}

集成测试:

public class MessagingIT{
    @ClassRule
    public static EmbeddedAMQPBroker embeddedAMQPBroker = new EmbeddedAMQPBroker();

    ...
}