在 @MessageDriven bean 中使用 amazon sqs - 池化/并行处理

using amazon sqs in a @MessageDriven bean - pooling / parallel processing

我们需要在 Java EE 应用程序中使用队列,并且由于它是基于云的应用程序(部署在 OpenShift Online 上),我们喜欢使用 amazon sqs。

如果我正确理解 JMS / Java EE 的接收部分的理论,@MessageDriven bean 由 Java EE 容器管理,因此很多 bean 实例如果传入消息的数量很多,则并行创建(根据最大池大小)。这当然是处理高负载的一大好处。

但是,我看不出如何在 Java EE 应用程序中以这种方式集成 aws sqs。我知道来自 http://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-java-message-service-jms-client.html:

的异步接收器示例
class MyListener implements MessageListener {

    @Override
    public void onMessage(Message message) {
        try {
            // Cast the received message as TextMessage and print the text to screen.
            if (message != null) {
                System.out.println("Received: " + ((TextMessage) message).getText());
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

然后:

// Create a consumer for the 'TestQueue'.
MessageConsumer consumer = session.createConsumer(queue);

// Instantiate and set the message listener for the consumer.
consumer.setMessageListener(new MyListener());

// Start receiving incoming messages.
connection.start();

这是官方的异步接收器示例 - 它不是 @MessageDriven bean。很明显,我们需要在某个地方使用凭据进行身份验证(通过创建 SQSConnectionFactory,然后是连接,然后是会话 - 这在示例中也有很好的描述)。
但我强烈认为此示例不会并行处理消息 - 即只有一个 bean 实例正在处理队列,这对于可扩展、高负载的应用程序来说不是一个好的解决方案。

a) 我们如何使用 Amazon SQS 实现真正的 Java EE 方式? 我刚找到一大堆 Spring 个例子。但必须是 Java EE 7。

b) 我们使用 Wildfly(目前为 8.2.1)。是否也可以让 Wildfly 在内部管理与 AWS 和应用程序的连接,我们可以使用队列,就好像它是应用程序服务器管理的队列一样(与数据库访问的数据源相同的方法)?

得到stdunbar答案后的结论:
在'proper way',我喜欢做的事情似乎是不可能的。所以我该怎么做?实现一个ManagedExecutorService as stdunbar描述到'wrap'的队列? - 然而,这意味着也有一个本地队列,这对于应用程序来说不是一个好情况,它应该是可扩展的?! 什么是替代品?我们是 运行 OpenShift Online 上的应用程序。用例如实例化自己的装备可能会更好。 ApacheMQ Cartridge...当然有很多缺点,比如成本,我们负责 'infrastructure'.

老实说,在这种情况下我对AWS真的很失望...

根据一些较旧的 docs I found

A container allows many instances of a message-driven bean class to be running concurrently, thus allowing for the concurrent processing of a stream of messages.

通过使用 Amazon JMS 集成,结合 声明式 MDB,您应该可以开始了。我不会使用 setMessageListener 接口。您可以使用 JMS 的声明式版本,因为您使用的是 Wildfly 8.x / EE7:

@MessageDriven(activationConfig = { /* your config - i.e. queue name, etc */ })
public class MyListener implements MessageListener {
    @Override
    public void onMessage(Message message) {
    }
}

这允许容器根据需要创建尽可能多的实例。请注意,在 Wildfly 中可能需要对 JMS 参数进行一些调整。

附带说明一下,让 Amazon 库负责读取 SQS 队列。我已经开始推出自己的 reader,认为我可以穿线它。但我发现,您不能将 AWS Java 库与从队列中读取的多个线程一起使用,因为您几乎每次都会获得重复读取。我有 4 个线程读取 SQS 队列,并且会得到 4 个相同的消息。我最终更改为单个 reader 将消息放入 LinkedBlockingDeque 以供其他一些线程使用。

我展示的一切都是纯粹的Java/EE。

编辑
在试用了 Amazon SQS/JMS 集成后,我觉得如果您使用它是在浪费时间。它仅适用于 JMS 1.1,因此它使用旧的 JMS 语法,同时也加入了 JNDI。此外,它仅适用于队列,不适用于主题。

我强烈建议您创建自己的实现。运行队列 reader 线程的 ManagedExecutorService,SQS 读取超时较短。每次循环都会从 SQS 队列中读取消息并将消息放入 JMS 队列或主题中。

很抱歉让您对此寄予厚望 - 亚马逊的维护不够值得。

我认为我的解决方案不合适 JAVA EE,但在我的情况下它有效。

配置:

@Singleton
public class SqsMessageManager
{
    private Integer numberOfReceivers = 3;

    public static SQSConnection connection = null;
    public static Queue queue = null;

    @Inject
    SqsMessageReceiver sqsMessageReceiver;

    public void init()
    {
        try
        {
            SQSConnectionFactory connectionFactory =
                    SQSConnectionFactory.builder()
                            .withRegion(Region.getRegion(Regions.EU_WEST_1))
                            .withAWSCredentialsProvider(new EnvironmentVariableCredentialsProvider())
                            .build();

            connection = connectionFactory.createConnection();

            queue = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createQueue("myQueue");

            for (int i = 0; i < numberOfReceivers; i++)
                connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue).setMessageListener(sqsMessageReceiver);

            connection.start();
        }
        catch (JMSException e)
        {
            e.getStackTrace();
        }
    }
}

那么发件人:

@Dependent
public class SqsMessageSender
{
    MessageProducer producer = null;
    Session senderSession = null;

    @PostConstruct
    public void createProducer(){
        try
        {
            // open new session and message producer
            senderSession = SqsMessageManager.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            producer = senderSession.createProducer(SqsMessageManager.queue);
        }catch(JMSException | NullPointerException e){
            ;
        }
    }

    @PreDestroy
    public void destroy(){
        try
        {
            // close session
            producer.close();
            senderSession.close();
        }catch(JMSException e){

        }
    }

    // sends a message to aws sqs queue
    public void sendMessage(String txt)
    {
        try
        {
            TextMessage textMessage = senderSession.createTextMessage(txt);
            producer.send(textMessage);
        }
        catch (JMSException e)
        {
            e.getStackTrace();
        }
    }
}

接收者:

@Dependent
public class SqsMessageReceiver implements MessageListener
{
    public void onMessage(Message inMessage) {
        ...
    }
}

Payara Cloud Connectors 似乎很新,但看起来很有前途。不知道这是否也适用于其他容器。据我所知,它基于 JCA 适配器。

通常,要使 MDB“连接”到远程 JMS Q,您需要资源适配器 (RA)。理论上,基于 JMS 规范实现的此类 RA 应该可以与任何符合规范的 JMS 提供程序一起使用,因此理论上您应该能够重用,例如 this implementation.

然而,正如上述项目的自述文件所说,您应该使用由特定 JMS 提供程序提供的 RA 而不是通用的 RA。不幸的是亚马逊不提供一个:(

但是最近,some awesome guy created an unofficial open-source one。我只是在评估它,稍后会根据我的经验更新这个答案。 (非常欢迎本 RA 其他用户的评论)