在 Spring 批处理中使用 StaxEventItemWriter 时,ZipFileInputStream 读取方法延迟了编组过程
ZipFileInputStream read method is delaying the process of marshalling when using StaxEventItemWriter in Spring Batch
前言
我在 spring-boot
应用程序中使用 spring batch
。 Spring 引导版本是 2.3.3.RELEASE
。
我有一个 multi-step
工作,它在第一步验证 xml 文件头,然后在 chunk oriented step
中验证 read transaction
,对每个做一些 business logic
事务,然后 write
返回到 xml
文件。在第三步也是最后一步中,我 delete the input file
。我的 Chunk size
当前设置为 500
笔交易。
问题
一切正常,除了 thread is locked
在 RUNNABLE
状态几乎 10 minutes
在 between write and next read
。我拿了 thread dump
但没能从中得到多少,即使我观察到 thread stack trace
一直是 same
。
"europeanGateway-3" #90 prio=5 os_prio=0 tid=0x0000023b7d347800 nid=0x6bd0 runnable [0x000000c228cfb000]
java.lang.Thread.State: RUNNABLE
at java.util.zip.ZipFile.read(Native Method)
at java.util.zip.ZipFile.access00(ZipFile.java:60)
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:735)
- locked <0x00000005c037d9e8> (a sun.net.www.protocol.jar.URLJarFile)
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:750)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at com.sun.xml.internal.bind.v2.bytecode.ClassTailor.tailor(ClassTailor.java:119)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.tailor(AccessorInjector.java:107)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.prepare(AccessorInjector.java:68)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.OptimizedAccessorFactory.get(OptimizedAccessorFactory.java:164)
at com.sun.xml.internal.bind.v2.runtime.reflect.Accessor$FieldReflection.optimize(Accessor.java:271)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementLeafProperty.<init>(SingleElementLeafProperty.java:77)
at sun.reflect.GeneratedConstructorAccessor73.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:305)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:124)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl$JAXBContextBuilder.build(JAXBContextImpl.java:1123)
at com.sun.xml.internal.bind.v2.ContextFactory.createContext(ContextFactory.java:147)
at sun.reflect.GeneratedMethodAccessor375.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:247)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:234)
at javax.xml.bind.ContextFinder.find(ContextFinder.java:462)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshalStaxResult(OmegaXmlJaxb2Marshaller.java:50)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshal(OmegaXmlJaxb2Marshaller.java:29)
at org.springframework.oxm.jaxb.Jaxb2Marshaller.marshal(Jaxb2Marshaller.java:695)
at org.springframework.batch.item.xml.StaxEventItemWriter.write(StaxEventItemWriter.java:770)
at org.springframework.batch.item.xml.StaxEventItemWriter$$FastClassBySpringCGLIB$$d105dd1.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.xml.StaxEventItemWriter$$EnhancerBySpringCGLIB$be73fd.write(<generated>)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter.write(OmegaXmlFileWriter.java:39)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$FastClassBySpringCGLIB$e7f3da.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$EnhancerBySpringCGLIB$$f046e646.write(<generated>)
at org.springframework.batch.item.support.CompositeItemWriter.write(CompositeItemWriter.java:59)
at org.springframework.batch.item.support.CompositeItemWriter$$FastClassBySpringCGLIB$$fd4a76ae.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.support.CompositeItemWriter$$EnhancerBySpringCGLIB$f2661a.write(<generated>)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:294)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x00000005c2106ac8> (a java.util.concurrent.ThreadPoolExecutor$Worker)
StaxEventItemWriter 配置
@JobScope
@Bean(name = "staxTransactionWriter", destroyMethod="")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack,
@Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) {
Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));
OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
marshaller.setSupportJaxbElementClass(true);
marshaller.setCheckForXmlRootElement(false);
return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
.headerCallback(omegaXmlHeaderCallBack)
.footerCallback(getOmegaXmlFooterCallBack())
.build();
}
Jaxb2Marshaller 配置
import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.Result;
import org.springframework.lang.Nullable;
import org.springframework.oxm.XmlMappingException;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.oxm.mime.MimeContainer;
import org.springframework.util.xml.StaxUtils;
import com.trax.europeangateway.model.dto.xsd.omega.ObjectFactory;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;
public class OmegaXmlJaxb2Marshaller extends Jaxb2Marshaller {
@Override
public void marshal(Object graph, Result result, @Nullable MimeContainer mimeContainer) throws XmlMappingException {
try {
Marshaller marshaller = createMarshaller();
if (StaxUtils.isStaxResult(result)) {
marshalStaxResult(marshaller, graph, result);
}
}
catch (JAXBException ex) {
throw convertJaxbException(ex);
}
}
private void marshalStaxResult(Marshaller jaxbMarshaller, Object graph, Result staxResult) throws JAXBException {
XMLStreamWriter streamWriter = StaxUtils.getXMLStreamWriter(staxResult);
if (streamWriter != null) {
jaxbMarshaller.marshal(graph, streamWriter);
}
else {
XMLEventWriter eventWriter = StaxUtils.getXMLEventWriter(staxResult);
if (eventWriter != null) {
ObjectFactory fact = new ObjectFactory();
TransactionPositionReport transaction = fact.createTransactionPositionReport();
transaction.setConfigurableFields(((TransactionPositionReport)graph).getConfigurableFields());
transaction.setMifir(((TransactionPositionReport)graph).getMifir());
transaction.setProcessingDetails(((TransactionPositionReport)graph).getProcessingDetails());
JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
QName qName = new QName(XMLConstants.NULL_NS_URI, "transaction", XMLConstants.DEFAULT_NS_PREFIX);
JAXBElement<TransactionPositionReport> element = new JAXBElement(qName, TransactionPositionReport.class, TransactionPositionReport.class, transaction);
marshaller.marshal(element, eventWriter);
}
else {
throw new IllegalArgumentException("StAX Result contains neither XMLStreamWriter nor XMLEventConsumer");
}
}
}
}
观察
如果我reduce
从500
到100
的chunk batch size
,等待的是reduced to 2 minutes(approx)
作者需要的时间与块大小成比例,这表明作者在编写每个项目时确实效率低下。
如果您每次检查时的堆栈跟踪都相似,那么根本原因是 OmegaXmlJaxb2Marshaller::marshalStaxResult
中 JAXBContext::newInstance
的调用。为每个要写入的项目创建一个新的编组器开销太大。
你应该移动代码块
JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
到 OmegaXmlJaxb2Marshaller
的构造函数并使 marshaller
成为实例变量。
那么,编组器只会创建一次,编写器应该会快得多。
前言
我在 spring-boot
应用程序中使用 spring batch
。 Spring 引导版本是 2.3.3.RELEASE
。
我有一个 multi-step
工作,它在第一步验证 xml 文件头,然后在 chunk oriented step
中验证 read transaction
,对每个做一些 business logic
事务,然后 write
返回到 xml
文件。在第三步也是最后一步中,我 delete the input file
。我的 Chunk size
当前设置为 500
笔交易。
问题
一切正常,除了 thread is locked
在 RUNNABLE
状态几乎 10 minutes
在 between write and next read
。我拿了 thread dump
但没能从中得到多少,即使我观察到 thread stack trace
一直是 same
。
"europeanGateway-3" #90 prio=5 os_prio=0 tid=0x0000023b7d347800 nid=0x6bd0 runnable [0x000000c228cfb000]
java.lang.Thread.State: RUNNABLE
at java.util.zip.ZipFile.read(Native Method)
at java.util.zip.ZipFile.access00(ZipFile.java:60)
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:735)
- locked <0x00000005c037d9e8> (a sun.net.www.protocol.jar.URLJarFile)
at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:750)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.DataInputStream.readByte(DataInputStream.java:265)
at com.sun.xml.internal.bind.v2.bytecode.ClassTailor.tailor(ClassTailor.java:119)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.tailor(AccessorInjector.java:107)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.prepare(AccessorInjector.java:68)
at com.sun.xml.internal.bind.v2.runtime.reflect.opt.OptimizedAccessorFactory.get(OptimizedAccessorFactory.java:164)
at com.sun.xml.internal.bind.v2.runtime.reflect.Accessor$FieldReflection.optimize(Accessor.java:271)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementLeafProperty.<init>(SingleElementLeafProperty.java:77)
at sun.reflect.GeneratedConstructorAccessor73.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:305)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:124)
at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl$JAXBContextBuilder.build(JAXBContextImpl.java:1123)
at com.sun.xml.internal.bind.v2.ContextFactory.createContext(ContextFactory.java:147)
at sun.reflect.GeneratedMethodAccessor375.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:247)
at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:234)
at javax.xml.bind.ContextFinder.find(ContextFinder.java:462)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641)
at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshalStaxResult(OmegaXmlJaxb2Marshaller.java:50)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshal(OmegaXmlJaxb2Marshaller.java:29)
at org.springframework.oxm.jaxb.Jaxb2Marshaller.marshal(Jaxb2Marshaller.java:695)
at org.springframework.batch.item.xml.StaxEventItemWriter.write(StaxEventItemWriter.java:770)
at org.springframework.batch.item.xml.StaxEventItemWriter$$FastClassBySpringCGLIB$$d105dd1.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.xml.StaxEventItemWriter$$EnhancerBySpringCGLIB$be73fd.write(<generated>)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter.write(OmegaXmlFileWriter.java:39)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$FastClassBySpringCGLIB$e7f3da.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$EnhancerBySpringCGLIB$$f046e646.write(<generated>)
at org.springframework.batch.item.support.CompositeItemWriter.write(CompositeItemWriter.java:59)
at org.springframework.batch.item.support.CompositeItemWriter$$FastClassBySpringCGLIB$$fd4a76ae.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
at org.springframework.batch.item.support.CompositeItemWriter$$EnhancerBySpringCGLIB$f2661a.write(<generated>)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:294)
at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217)
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:273)
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:147)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Locked ownable synchronizers:
- <0x00000005c2106ac8> (a java.util.concurrent.ThreadPoolExecutor$Worker)
StaxEventItemWriter 配置
@JobScope
@Bean(name = "staxTransactionWriter", destroyMethod="")
public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack,
@Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) {
Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));
OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
marshaller.setSupportJaxbElementClass(true);
marshaller.setCheckForXmlRootElement(false);
return new StaxEventItemWriterBuilder<TransactionPositionReport>()
.name("transactionWriter")
.resource(exportFileResource)
.marshaller(marshaller)
.rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
.headerCallback(omegaXmlHeaderCallBack)
.footerCallback(getOmegaXmlFooterCallBack())
.build();
}
Jaxb2Marshaller 配置
import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.Result;
import org.springframework.lang.Nullable;
import org.springframework.oxm.XmlMappingException;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.oxm.mime.MimeContainer;
import org.springframework.util.xml.StaxUtils;
import com.trax.europeangateway.model.dto.xsd.omega.ObjectFactory;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;
public class OmegaXmlJaxb2Marshaller extends Jaxb2Marshaller {
@Override
public void marshal(Object graph, Result result, @Nullable MimeContainer mimeContainer) throws XmlMappingException {
try {
Marshaller marshaller = createMarshaller();
if (StaxUtils.isStaxResult(result)) {
marshalStaxResult(marshaller, graph, result);
}
}
catch (JAXBException ex) {
throw convertJaxbException(ex);
}
}
private void marshalStaxResult(Marshaller jaxbMarshaller, Object graph, Result staxResult) throws JAXBException {
XMLStreamWriter streamWriter = StaxUtils.getXMLStreamWriter(staxResult);
if (streamWriter != null) {
jaxbMarshaller.marshal(graph, streamWriter);
}
else {
XMLEventWriter eventWriter = StaxUtils.getXMLEventWriter(staxResult);
if (eventWriter != null) {
ObjectFactory fact = new ObjectFactory();
TransactionPositionReport transaction = fact.createTransactionPositionReport();
transaction.setConfigurableFields(((TransactionPositionReport)graph).getConfigurableFields());
transaction.setMifir(((TransactionPositionReport)graph).getMifir());
transaction.setProcessingDetails(((TransactionPositionReport)graph).getProcessingDetails());
JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
QName qName = new QName(XMLConstants.NULL_NS_URI, "transaction", XMLConstants.DEFAULT_NS_PREFIX);
JAXBElement<TransactionPositionReport> element = new JAXBElement(qName, TransactionPositionReport.class, TransactionPositionReport.class, transaction);
marshaller.marshal(element, eventWriter);
}
else {
throw new IllegalArgumentException("StAX Result contains neither XMLStreamWriter nor XMLEventConsumer");
}
}
}
}
观察
如果我reduce
从500
到100
的chunk batch size
,等待的是reduced to 2 minutes(approx)
作者需要的时间与块大小成比例,这表明作者在编写每个项目时确实效率低下。
如果您每次检查时的堆栈跟踪都相似,那么根本原因是 OmegaXmlJaxb2Marshaller::marshalStaxResult
中 JAXBContext::newInstance
的调用。为每个要写入的项目创建一个新的编组器开销太大。
你应该移动代码块
JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
到 OmegaXmlJaxb2Marshaller
的构造函数并使 marshaller
成为实例变量。
那么,编组器只会创建一次,编写器应该会快得多。