使用 Spring 批处理创建一个 @JobScope @Service 来保存作业的共享内存?
Create a @JobScope @Service with Spring Batch that holds shared memory for a job?
基本上我想要做的是创建一个 @Service 或组件,将一些数据从数据库 table 加载到内存中,该数据库在整个作业执行过程中被引用
package com.squareup.se.bridge.batchworker.components.context;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.squareup.se.bridge.batchworker.repositories.BridgeBatchJobParametersRepository;
import com.squareup.se.bridge.batchworker.util.JobParameterKeys;
import com.squareup.se.bridge.core.api.services.batchworker.FatalSyncException;
import com.squareup.se.bridge.core.integration.util.logger.JobExecutionLoggerFactory;
import java.io.IOException;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;
@JobScope @Component public class BridgeBatchIntegrationJobContextProvider
implements JobExecutionListener {
private Logger logger;
private ObjectMapper mapper;
private BridgeBatchJobParametersRepository bridgeBatchJobParametersRepository;
private BridgeIntegrationJobContext context;
public BridgeBatchIntegrationJobContextProvider(ObjectMapper mapper,
BridgeBatchJobParametersRepository bridgeBatchJobParametersRepository) {
this.mapper = mapper;
this.bridgeBatchJobParametersRepository = bridgeBatchJobParametersRepository;
}
@Override public void beforeJob(JobExecution jobExecution) {
var jobId = jobExecution.getJobParameters().getString(JobParameterKeys.SYNC_ID);
this.logger = JobExecutionLoggerFactory.getLogger(
BridgeBatchIntegrationJobContextProvider.class, jobId);
this.context = deserializeJobParameters(jobId);
}
@NotNull public BridgeIntegrationJobContext get() {
if (context == null) {
throw new IllegalStateException("Expected context to exist before calling this method");
}
return context;
}
@Override public void afterJob(JobExecution jobExecution) { }
@NotNull private String getParameters(String jobId) {
var jobParams = bridgeBatchJobParametersRepository.find(jobId);
if (jobParams == null || jobParams.size() == 0) {
throw new FatalSyncException(String.format("No job parameters for job `%s` exists", jobId));
}
if (jobParams.size() > 1) {
throw new FatalSyncException(String.format("Multiple parameter entries exist for job `%s`",
jobId));
} else if (Strings.isNullOrEmpty(jobParams.get(0).getIntegrationContext())) {
throw new FatalSyncException(String.format("Job parameters for job `%s` is empty", jobId));
}
return jobParams.get(0).getIntegrationContext();
}
@NotNull private BridgeIntegrationJobContext deserializeJobParameters(String jobId) {
try {
return mapper.readValue(getParameters(jobId),
BridgeIntegrationJobContext.class);
} catch (IOException e) {
//TODO page on this
logger.info(e.getMessage(), e);
throw new FatalSyncException(e);
}
}
}
我配置了这样的作业:
return jobBuilderFactory.get(CUSTOMERS_BATCH_JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(loadFromOriginStep)
.next(retryFailuresFromOriginStep)
.listener(bridgeBatchIntegrationJobContextProvider)
.listener(jobListener)
.build();
构造函数依赖于其他 bean,包括 jackson 对象映射器和 JPA 存储库。我遇到了一些问题:
- 构造函数未被 Spring 实例化,因此实例化
我要绑定的变量不存在
如果我从组件中删除@JobScope,Spring 构造组件实例。
我没看到你的代码哪里用到了@JobContext
,根据你的要求,你不需要它。
如果您想使用侦听器在作业执行上下文中加载一些数据,您可以在 beforeJob
中使用 jobExecution.getExecutionContext().put("key", "value");
。
也就是说,不建议在执行上下文中加载大量数据,因为它会在步骤之间持久化。
因此,除非您在执行上下文中加载少量数据,否则您需要找到另一种方法(例如使用单独的缓存,请参阅 )。
基本上我想要做的是创建一个 @Service 或组件,将一些数据从数据库 table 加载到内存中,该数据库在整个作业执行过程中被引用
package com.squareup.se.bridge.batchworker.components.context;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.squareup.se.bridge.batchworker.repositories.BridgeBatchJobParametersRepository;
import com.squareup.se.bridge.batchworker.util.JobParameterKeys;
import com.squareup.se.bridge.core.api.services.batchworker.FatalSyncException;
import com.squareup.se.bridge.core.integration.util.logger.JobExecutionLoggerFactory;
import java.io.IOException;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.stereotype.Component;
@JobScope @Component public class BridgeBatchIntegrationJobContextProvider
implements JobExecutionListener {
private Logger logger;
private ObjectMapper mapper;
private BridgeBatchJobParametersRepository bridgeBatchJobParametersRepository;
private BridgeIntegrationJobContext context;
public BridgeBatchIntegrationJobContextProvider(ObjectMapper mapper,
BridgeBatchJobParametersRepository bridgeBatchJobParametersRepository) {
this.mapper = mapper;
this.bridgeBatchJobParametersRepository = bridgeBatchJobParametersRepository;
}
@Override public void beforeJob(JobExecution jobExecution) {
var jobId = jobExecution.getJobParameters().getString(JobParameterKeys.SYNC_ID);
this.logger = JobExecutionLoggerFactory.getLogger(
BridgeBatchIntegrationJobContextProvider.class, jobId);
this.context = deserializeJobParameters(jobId);
}
@NotNull public BridgeIntegrationJobContext get() {
if (context == null) {
throw new IllegalStateException("Expected context to exist before calling this method");
}
return context;
}
@Override public void afterJob(JobExecution jobExecution) { }
@NotNull private String getParameters(String jobId) {
var jobParams = bridgeBatchJobParametersRepository.find(jobId);
if (jobParams == null || jobParams.size() == 0) {
throw new FatalSyncException(String.format("No job parameters for job `%s` exists", jobId));
}
if (jobParams.size() > 1) {
throw new FatalSyncException(String.format("Multiple parameter entries exist for job `%s`",
jobId));
} else if (Strings.isNullOrEmpty(jobParams.get(0).getIntegrationContext())) {
throw new FatalSyncException(String.format("Job parameters for job `%s` is empty", jobId));
}
return jobParams.get(0).getIntegrationContext();
}
@NotNull private BridgeIntegrationJobContext deserializeJobParameters(String jobId) {
try {
return mapper.readValue(getParameters(jobId),
BridgeIntegrationJobContext.class);
} catch (IOException e) {
//TODO page on this
logger.info(e.getMessage(), e);
throw new FatalSyncException(e);
}
}
}
我配置了这样的作业:
return jobBuilderFactory.get(CUSTOMERS_BATCH_JOB_NAME)
.incrementer(new RunIdIncrementer())
.start(loadFromOriginStep)
.next(retryFailuresFromOriginStep)
.listener(bridgeBatchIntegrationJobContextProvider)
.listener(jobListener)
.build();
构造函数依赖于其他 bean,包括 jackson 对象映射器和 JPA 存储库。我遇到了一些问题:
- 构造函数未被 Spring 实例化,因此实例化 我要绑定的变量不存在
如果我从组件中删除@JobScope,Spring 构造组件实例。
我没看到你的代码哪里用到了@JobContext
,根据你的要求,你不需要它。
如果您想使用侦听器在作业执行上下文中加载一些数据,您可以在 beforeJob
中使用 jobExecution.getExecutionContext().put("key", "value");
。
也就是说,不建议在执行上下文中加载大量数据,因为它会在步骤之间持久化。
因此,除非您在执行上下文中加载少量数据,否则您需要找到另一种方法(例如使用单独的缓存,请参阅