为 JMS 消息创建分配并行流处理

Distributing ParallelStream Processing for JMS Message Creation

我正在尝试扩展下面的示例程序,该程序创建一个简单的 JMS 消息并将其同时推送到同一个队列。该示例程序目前在单个四核主机上大约需要 20 秒。任何人都可以建议对以下内容进行一些更改以提高性能吗? 20 秒测量仅适用于以下并行流处理线:

test.parallelStream().forEach(e -> sender.sendMessage(e));

我能想到的一种方法是将我的集合 ("test") 分散到几台主机,然后这些主机通过将每个线程配置为拥有自己的线程池来以块的形式并发处理集合。这样做的一个缺点是容错,并且必须放置适当的结构以确保每个线程池不会处理相同的消息。

另一种方法是使用更 performant/concurrent 的代理,例如 Kafka

请注意 无论我采用何种异步线程方法,我都需要能够控制从这些线程发送到另一个应用程序的消息数量,因为它们只支持同时设置金额。还有其他想法吗?

完整来源:

import java.util.ArrayList;
import java.util.List;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {
    private static ConnectionFactory factory = null;
    private static Connection connection = null;
    private static Session session = null;
    private static Destination destination = null;
    private static MessageProducer producer = null;

    public Sender() {}

    public void sendMessage(String test) {
        try {
            TextMessage message = session.createTextMessage();
            message.setText(test);
            producer.send(message);
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws JMSException {
            factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
    ((ActiveMQConnectionFactory)factory).setUseAsyncSend(true);
    ((ActiveMQConnectionFactory)factory).setOptimizeAcknowledge(true);
    connection = factory.createConnection();
    connection.start();
    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    destination = session.createQueue("SAMPLEQUEUE");
    producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

        List<String> test = new ArrayList<String>();
        for (int i = 0; i <= 100000; i++) {
            test.add(Integer.toString(i));
        }
        Sender sender = new Sender();
        test.parallelStream().forEach(e -> sender.sendMessage(e));
    }
}

正如所指出的,MessagProducer 不能保证是线程安全的,但 Session 也不是。

总之,正题:

  1. 我强烈建议不要对 IO 操作使用默认流 api,因为它对所有操作使用一个线程池,并且线程池受内核数量限制,而且您不能指定暂停。您应该改用 Executor:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html#newCachedThreadPool--

    ExecutorService executor = Executors.newFixedThreadPool(2);
    for (String msg: msgs) {
        executor.execute(() -> send(msg)); 
    }

  1. 如果您不必保证消息传递,您可以将消息传递模式更改为 NON_PERSISTENT,这会加快发送速度,因为消息存储和保证传递没有开销。
  2. 实际上,您不需要自己进行线程管理,因为 ActiveMQ 具有异步支持,这意味着您的消息可以在单独的线程上得到确认,只要您不必保证交付即可:http://activemq.apache.org/async-sends.html
  3. 如果您确实需要保证传递,那么请在 JMS 事务中使用 PERSISTENT 消息传递模式(默认)来批量发送您的消息(一种全有或全无的方法)。这大大提高了性能,因为您只在提交期间进行实际发送。 https://docs.oracle.com/javaee/6/api/javax/jms/Connection.html#createSession(boolean, int)
  4. 在 ActiveMQ 中启用 optimizeAcknowledge 以加速消息代理本身:http://activemq.apache.org/optimized-acknowledgement.html
  5. 最后,您可以将会话确认模式设置为Session.DUPS_OK_ACKNOWLEDGE,以便在后台延迟确认消息,但这会产生消息的重复发送,因此消费者端需要使用唯一的 id 或为此做好准备相似

当然,您不应该同时使用所有这些方法,只需根据常识判断适合的方法即可。

除了上面说的,我还要重点说一下你是怎么消费消息的。如果您的接收端应用程序可以同时处理有限数量的消息,那么您同时发送多少条消息并不重要,因为它们都会在队列中等待,直到您的接收应用程序准备好使用它们.因此,假设您的接收应用程序可以同时处理 10 个请求。我会为您的队列设置 10 个消费者,每个消费者仅在处理完之前处理的请求后才从队列中读取传入请求。这样你接收消息的速度就不那么重要了(除了担心队列溢出)。而且您的应用始终不会同时处理超过 10 个请求。