Spring 使用 JMS 进行批处理并行处理

Spring Batch Parallel processing with JMS

我实施了一个 spring 从 weblogic Jms 队列读取的批处理项目(自定义项 Reader 不是消息驱动),然后将 Jms 消息数据传递给项目编写器(块 = 1)我在哪里调用一些 API 并写入数据库。

但是,我正在尝试实现并行 Jms 处理,读取并行 Jms 消息并将它们传递给编写器,而无需等待前面的进程完成。

我在之前的项目中使用了 DefaultMessageListenerContainer,它提供了 jms 消息的并行消费,但在这个项目中我必须使用 spring 批处理框架。

我该如何解决这个问题?

我的代码:

 @Bean
    Step step1() {
        return steps.get("step1").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
                .reader(reader()).processor(processor()).writer(writer())
                .listener(stepListener()).build();
    }

    @Bean
    Job job(@Qualifier("step1") Step step1) {
        return jobs.get("job").start(step1).build();
    }

Jms 代码:

  @Override
    public void initQueueConnection() throws NamingException, JMSException {

        Hashtable<String, String> properties = new Hashtable<String, String>();
        properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
        properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_RECEIVE));
        InitialContext vInitialContext = new InitialContext(properties);

        QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
                .lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_RECEIVE));

        vQueueConnection = vQueueConnectionFactory.createQueueConnection();
        vQueueConnection.start();

        vQueueSession = vQueueConnection.createQueueSession(false, 0);
        Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_RECEIVE));
        consumer = vQueueSession.createConsumer(vQueue, "JMSCorrelationID IS NOT NULL");

    }

    @Override
    public Message receiveMessages() throws NamingException, JMSException {

        return consumer.receive(20000);

    }

项目reader:

@Override
    public Message read() throws Exception {

        return jmsServiceReceiver.receiveMessages();
    }

谢谢!我会很感激你的帮助:)

spring-batch-infrastructure-tests 子项目中有一个 BatchMessageListenerContainer

https://github.com/spring-projects/spring-batch/blob/d8fc58338d3b059b67b5f777adc132d2564d7402/spring-batch-infrastructure-tests/src/main/java/org/springframework/batch/container/jms/BatchMessageListenerContainer.java

Message listener container adapted for intercepting the message reception with advice provided through configuration. To enable batching of messages in a single transaction, use the TransactionInterceptor and the RepeatOperationsInterceptor in the advice chain (with or without a transaction manager set in the base class). Instead of receiving a single message and processing it, the container will then use a RepeatOperations to receive multiple messages in the same thread. Use with a RepeatOperations and a transaction interceptor. If the transaction interceptor uses XA then use an XA connection factory, or else the TransactionAwareConnectionFactoryProxy to synchronize the JMS session with the ongoing transaction (opening up the possibility of duplicate messages after a failure). In the latter case you will not need to provide a transaction manager in the base class - it only gets on the way and prevents the JMS session from synchronizing with the database transaction.

也许您可以根据您的用例对其进行调整。

我能够通过多线程步骤做到这一点:

  // Jobs et Steps
@Bean
Step stepDetectionIncoherencesLiq(@Autowired StepBuilderFactory steps) {

    int threadSize = Integer.parseInt(env.getProperty(PropertyConstant.THREAD_POOL_SIZE));

    return steps.get("stepDetectionIncoherencesLiq").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
            .reader(reader()).processor(processor()).writer(writer())
            .readerIsTransactionalQueue()
            .faultTolerant()
            .taskExecutor(taskExecutor())
            .throttleLimit(threadSize)
            .listener(stepListener())
            .build();
}

还有一个带有 jmsTemplate 的 jmsItemReader,而不是显式创建会话和连接,它管理连接,所以我不再有 jms 异常:( JmsException : “当另一个接收正在进行时无效阻止接收”)

 @Bean
    public JmsItemReader<Message> reader() {
        JmsItemReader<Message> itemReader = new JmsItemReader<>();
        itemReader.setItemType(Message.class);
        itemReader.setJmsTemplate(jmsTemplate());
        return itemReader;
    }