Spring 使用 JMS 进行批处理并行处理
Spring Batch Parallel processing with JMS
我实施了一个 spring 从 weblogic Jms 队列读取的批处理项目(自定义项 Reader 不是消息驱动),然后将 Jms 消息数据传递给项目编写器(块 = 1)我在哪里调用一些 API 并写入数据库。
但是,我正在尝试实现并行 Jms 处理,读取并行 Jms 消息并将它们传递给编写器,而无需等待前面的进程完成。
我在之前的项目中使用了 DefaultMessageListenerContainer,它提供了 jms 消息的并行消费,但在这个项目中我必须使用 spring 批处理框架。
- 我尝试使用最简单的解决方案(多线程步骤)但是它
没有工作,JmsException:“无效的阻塞接收时另一个
接收正在进行中”,这可能意味着我的 reader 是
有状态。
- 我想过使用远程分区,但后来我不得不阅读所有
消息并在调用之前将数据放入步骤执行上下文中
奴隶的步骤,如果处理大的,这不是很有效
消息数。
- 我稍微研究了远程分块,我知道它通过队列通道传递数据,但我似乎无法找到从 Jms 读取并将消息放入本地队列以供从属工作人员使用的实用程序。
我该如何解决这个问题?
我的代码:
@Bean
Step step1() {
return steps.get("step1").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
.reader(reader()).processor(processor()).writer(writer())
.listener(stepListener()).build();
}
@Bean
Job job(@Qualifier("step1") Step step1) {
return jobs.get("job").start(step1).build();
}
Jms 代码:
@Override
public void initQueueConnection() throws NamingException, JMSException {
Hashtable<String, String> properties = new Hashtable<String, String>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_RECEIVE));
InitialContext vInitialContext = new InitialContext(properties);
QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
.lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_RECEIVE));
vQueueConnection = vQueueConnectionFactory.createQueueConnection();
vQueueConnection.start();
vQueueSession = vQueueConnection.createQueueSession(false, 0);
Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_RECEIVE));
consumer = vQueueSession.createConsumer(vQueue, "JMSCorrelationID IS NOT NULL");
}
@Override
public Message receiveMessages() throws NamingException, JMSException {
return consumer.receive(20000);
}
项目reader:
@Override
public Message read() throws Exception {
return jmsServiceReceiver.receiveMessages();
}
谢谢!我会很感激你的帮助:)
spring-batch-infrastructure-tests
子项目中有一个 BatchMessageListenerContainer
。
Message listener container adapted for intercepting the message reception with advice provided through configuration.
To enable batching of messages in a single transaction, use the TransactionInterceptor and the RepeatOperationsInterceptor in the advice chain (with or without a transaction manager set in the base class). Instead of receiving a single message and processing it, the container will then use a RepeatOperations to receive multiple messages in the same thread. Use with a RepeatOperations and a transaction interceptor. If the transaction interceptor uses XA then use an XA connection factory, or else the TransactionAwareConnectionFactoryProxy to synchronize the JMS session with the ongoing transaction (opening up the possibility of duplicate messages after a failure). In the latter case you will not need to provide a transaction manager in the base class - it only gets on the way and prevents the JMS session from synchronizing with the database transaction.
也许您可以根据您的用例对其进行调整。
我能够通过多线程步骤做到这一点:
// Jobs et Steps
@Bean
Step stepDetectionIncoherencesLiq(@Autowired StepBuilderFactory steps) {
int threadSize = Integer.parseInt(env.getProperty(PropertyConstant.THREAD_POOL_SIZE));
return steps.get("stepDetectionIncoherencesLiq").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
.reader(reader()).processor(processor()).writer(writer())
.readerIsTransactionalQueue()
.faultTolerant()
.taskExecutor(taskExecutor())
.throttleLimit(threadSize)
.listener(stepListener())
.build();
}
还有一个带有 jmsTemplate 的 jmsItemReader,而不是显式创建会话和连接,它管理连接,所以我不再有 jms 异常:( JmsException : “当另一个接收正在进行时无效阻止接收”)
@Bean
public JmsItemReader<Message> reader() {
JmsItemReader<Message> itemReader = new JmsItemReader<>();
itemReader.setItemType(Message.class);
itemReader.setJmsTemplate(jmsTemplate());
return itemReader;
}
我实施了一个 spring 从 weblogic Jms 队列读取的批处理项目(自定义项 Reader 不是消息驱动),然后将 Jms 消息数据传递给项目编写器(块 = 1)我在哪里调用一些 API 并写入数据库。
但是,我正在尝试实现并行 Jms 处理,读取并行 Jms 消息并将它们传递给编写器,而无需等待前面的进程完成。
我在之前的项目中使用了 DefaultMessageListenerContainer,它提供了 jms 消息的并行消费,但在这个项目中我必须使用 spring 批处理框架。
- 我尝试使用最简单的解决方案(多线程步骤)但是它 没有工作,JmsException:“无效的阻塞接收时另一个 接收正在进行中”,这可能意味着我的 reader 是 有状态。
- 我想过使用远程分区,但后来我不得不阅读所有 消息并在调用之前将数据放入步骤执行上下文中 奴隶的步骤,如果处理大的,这不是很有效 消息数。
- 我稍微研究了远程分块,我知道它通过队列通道传递数据,但我似乎无法找到从 Jms 读取并将消息放入本地队列以供从属工作人员使用的实用程序。
我该如何解决这个问题?
我的代码:
@Bean
Step step1() {
return steps.get("step1").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
.reader(reader()).processor(processor()).writer(writer())
.listener(stepListener()).build();
}
@Bean
Job job(@Qualifier("step1") Step step1) {
return jobs.get("job").start(step1).build();
}
Jms 代码:
@Override
public void initQueueConnection() throws NamingException, JMSException {
Hashtable<String, String> properties = new Hashtable<String, String>();
properties.put(Context.INITIAL_CONTEXT_FACTORY, env.getProperty(WebLogicConstant.JNDI_FACTORY));
properties.put(Context.PROVIDER_URL, env.getProperty(WebLogicConstant.JMS_WEBLOGIC_URL_RECEIVE));
InitialContext vInitialContext = new InitialContext(properties);
QueueConnectionFactory vQueueConnectionFactory = (QueueConnectionFactory) vInitialContext
.lookup(env.getProperty(WebLogicConstant.JMS_FACTORY_RECEIVE));
vQueueConnection = vQueueConnectionFactory.createQueueConnection();
vQueueConnection.start();
vQueueSession = vQueueConnection.createQueueSession(false, 0);
Queue vQueue = (Queue) vInitialContext.lookup(env.getProperty(WebLogicConstant.JMS_QUEUE_RECEIVE));
consumer = vQueueSession.createConsumer(vQueue, "JMSCorrelationID IS NOT NULL");
}
@Override
public Message receiveMessages() throws NamingException, JMSException {
return consumer.receive(20000);
}
项目reader:
@Override
public Message read() throws Exception {
return jmsServiceReceiver.receiveMessages();
}
谢谢!我会很感激你的帮助:)
spring-batch-infrastructure-tests
子项目中有一个 BatchMessageListenerContainer
。
Message listener container adapted for intercepting the message reception with advice provided through configuration. To enable batching of messages in a single transaction, use the TransactionInterceptor and the RepeatOperationsInterceptor in the advice chain (with or without a transaction manager set in the base class). Instead of receiving a single message and processing it, the container will then use a RepeatOperations to receive multiple messages in the same thread. Use with a RepeatOperations and a transaction interceptor. If the transaction interceptor uses XA then use an XA connection factory, or else the TransactionAwareConnectionFactoryProxy to synchronize the JMS session with the ongoing transaction (opening up the possibility of duplicate messages after a failure). In the latter case you will not need to provide a transaction manager in the base class - it only gets on the way and prevents the JMS session from synchronizing with the database transaction.
也许您可以根据您的用例对其进行调整。
我能够通过多线程步骤做到这一点:
// Jobs et Steps
@Bean
Step stepDetectionIncoherencesLiq(@Autowired StepBuilderFactory steps) {
int threadSize = Integer.parseInt(env.getProperty(PropertyConstant.THREAD_POOL_SIZE));
return steps.get("stepDetectionIncoherencesLiq").<Message, DetectionIncoherenceLiqJmsOut>chunk(1)
.reader(reader()).processor(processor()).writer(writer())
.readerIsTransactionalQueue()
.faultTolerant()
.taskExecutor(taskExecutor())
.throttleLimit(threadSize)
.listener(stepListener())
.build();
}
还有一个带有 jmsTemplate 的 jmsItemReader,而不是显式创建会话和连接,它管理连接,所以我不再有 jms 异常:( JmsException : “当另一个接收正在进行时无效阻止接收”)
@Bean
public JmsItemReader<Message> reader() {
JmsItemReader<Message> itemReader = new JmsItemReader<>();
itemReader.setItemType(Message.class);
itemReader.setJmsTemplate(jmsTemplate());
return itemReader;
}