在 Spring 批处理中使用 StaxEventItemWriter 时,ZipFileInputStream 读取方法延迟了编组过程

ZipFileInputStream read method is delaying the process of marshalling when using StaxEventItemWriter in Spring Batch

前言

我在 spring-boot 应用程序中使用 spring batch。 Spring 引导版本是 2.3.3.RELEASE

我有一个 multi-step 工作,它在第一步验证 xml 文件头,然后在 chunk oriented step 中验证 read transaction,对每个做一些 business logic事务,然后 write 返回到 xml 文件。在第三步也是最后一步中,我 delete the input file。我的 Chunk size 当前设置为 500 笔交易。

问题

一切正常,除了 thread is lockedRUNNABLE 状态几乎 10 minutesbetween write and next read。我拿了 thread dump 但没能从中得到多少,即使我观察到 thread stack trace 一直是 same

"europeanGateway-3" #90 prio=5 os_prio=0 tid=0x0000023b7d347800 nid=0x6bd0 runnable [0x000000c228cfb000]
   java.lang.Thread.State: RUNNABLE
        at java.util.zip.ZipFile.read(Native Method)
        at java.util.zip.ZipFile.access00(ZipFile.java:60)
        at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:735)
        - locked <0x00000005c037d9e8> (a sun.net.www.protocol.jar.URLJarFile)
        at java.util.zip.ZipFile$ZipFileInputStream.read(ZipFile.java:750)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.DataInputStream.readByte(DataInputStream.java:265)
        at com.sun.xml.internal.bind.v2.bytecode.ClassTailor.tailor(ClassTailor.java:119)
        at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.tailor(AccessorInjector.java:107)
        at com.sun.xml.internal.bind.v2.runtime.reflect.opt.AccessorInjector.prepare(AccessorInjector.java:68)
        at com.sun.xml.internal.bind.v2.runtime.reflect.opt.OptimizedAccessorFactory.get(OptimizedAccessorFactory.java:164)
        at com.sun.xml.internal.bind.v2.runtime.reflect.Accessor$FieldReflection.optimize(Accessor.java:271)
        at com.sun.xml.internal.bind.v2.runtime.property.SingleElementLeafProperty.<init>(SingleElementLeafProperty.java:77)
        at sun.reflect.GeneratedConstructorAccessor73.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
        at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
        at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
        at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
        at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:507)
        at com.sun.xml.internal.bind.v2.runtime.property.SingleElementNodeProperty.<init>(SingleElementNodeProperty.java:90)
        at sun.reflect.GeneratedConstructorAccessor74.newInstance(Unknown Source)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at com.sun.xml.internal.bind.v2.runtime.property.PropertyFactory.create(PropertyFactory.java:113)
        at com.sun.xml.internal.bind.v2.runtime.ClassBeanInfoImpl.<init>(ClassBeanInfoImpl.java:166)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.getOrCreate(JAXBContextImpl.java:488)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:305)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl.<init>(JAXBContextImpl.java:124)
        at com.sun.xml.internal.bind.v2.runtime.JAXBContextImpl$JAXBContextBuilder.build(JAXBContextImpl.java:1123)
        at com.sun.xml.internal.bind.v2.ContextFactory.createContext(ContextFactory.java:147)
        at sun.reflect.GeneratedMethodAccessor375.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:247)
        at javax.xml.bind.ContextFinder.newInstance(ContextFinder.java:234)
        at javax.xml.bind.ContextFinder.find(ContextFinder.java:462)
        at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:641)
        at javax.xml.bind.JAXBContext.newInstance(JAXBContext.java:584)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshalStaxResult(OmegaXmlJaxb2Marshaller.java:50)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlJaxb2Marshaller.marshal(OmegaXmlJaxb2Marshaller.java:29)
        at org.springframework.oxm.jaxb.Jaxb2Marshaller.marshal(Jaxb2Marshaller.java:695)
        at org.springframework.batch.item.xml.StaxEventItemWriter.write(StaxEventItemWriter.java:770)
        at org.springframework.batch.item.xml.StaxEventItemWriter$$FastClassBySpringCGLIB$$d105dd1.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at org.springframework.batch.item.xml.StaxEventItemWriter$$EnhancerBySpringCGLIB$be73fd.write(<generated>)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter.write(OmegaXmlFileWriter.java:39)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$FastClassBySpringCGLIB$e7f3da.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at com.trax.europeangateway.itemwriter.omegaxml.OmegaXmlFileWriter$$EnhancerBySpringCGLIB$$f046e646.write(<generated>)
        at org.springframework.batch.item.support.CompositeItemWriter.write(CompositeItemWriter.java:59)
        at org.springframework.batch.item.support.CompositeItemWriter$$FastClassBySpringCGLIB$$fd4a76ae.invoke(<generated>)
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:136)
        at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:124)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749)
        at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:691)
        at org.springframework.batch.item.support.CompositeItemWriter$$EnhancerBySpringCGLIB$f2661a.write(<generated>)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.writeItems(SimpleChunkProcessor.java:193)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doWrite(SimpleChunkProcessor.java:159)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.write(SimpleChunkProcessor.java:294)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:217)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:77)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140)
        at org.springframework.batch.core.step.tasklet.TaskletStep.doInChunkContext(TaskletStep.java:273)
        at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82)
        at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375)
        at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
        at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145)
        at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258)
        at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208)
        at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
        at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:410)
        at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:136)
        at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319)
        at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:147)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

   Locked ownable synchronizers:
        - <0x00000005c2106ac8> (a java.util.concurrent.ThreadPoolExecutor$Worker)

StaxEventItemWriter 配置

@JobScope
    @Bean(name = "staxTransactionWriter", destroyMethod="")
    public StaxEventItemWriter<TransactionPositionReport> staxTransactionItemWriter(OmegaXmlHeaderCallBack omegaXmlHeaderCallBack, 
            @Value("#{jobParameters['file.path']}") String path, @Value("#{jobParameters['submission.account']}") String submissionAccount) {
        Resource exportFileResource = new FileSystemResource(fileUtils.getFilePath(path, submissionAccount, Directory.TEMP, true));
 
        OmegaXmlJaxb2Marshaller marshaller = new OmegaXmlJaxb2Marshaller();
        marshaller.setContextPath("com.trax.europeangateway.model.dto.xsd.omega");
        marshaller.setSupportJaxbElementClass(true);
        marshaller.setCheckForXmlRootElement(false);
        
        return new StaxEventItemWriterBuilder<TransactionPositionReport>()
                .name("transactionWriter")
                .resource(exportFileResource)
                .marshaller(marshaller)
                .rootTagName("{http://deutsche-boerse.com/DBRegHub}reportFile")
                .headerCallback(omegaXmlHeaderCallBack)
                .footerCallback(getOmegaXmlFooterCallBack())
                .build();
    }

Jaxb2Marshaller 配置

import javax.xml.XMLConstants;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBElement;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.namespace.QName;
import javax.xml.stream.XMLEventWriter;
import javax.xml.stream.XMLStreamWriter;
import javax.xml.transform.Result;

import org.springframework.lang.Nullable;
import org.springframework.oxm.XmlMappingException;
import org.springframework.oxm.jaxb.Jaxb2Marshaller;
import org.springframework.oxm.mime.MimeContainer;
import org.springframework.util.xml.StaxUtils;

import com.trax.europeangateway.model.dto.xsd.omega.ObjectFactory;
import com.trax.europeangateway.model.dto.xsd.omega.TransactionPositionReport;

public class OmegaXmlJaxb2Marshaller extends Jaxb2Marshaller {
    
    @Override
    public void marshal(Object graph, Result result, @Nullable MimeContainer mimeContainer) throws XmlMappingException {
        try {
            Marshaller marshaller = createMarshaller();
            if (StaxUtils.isStaxResult(result)) {
                marshalStaxResult(marshaller, graph, result);
            }
        }
        catch (JAXBException ex) {
            throw convertJaxbException(ex);
        }
    }
    
    private void marshalStaxResult(Marshaller jaxbMarshaller, Object graph, Result staxResult) throws JAXBException {
        XMLStreamWriter streamWriter = StaxUtils.getXMLStreamWriter(staxResult);
        if (streamWriter != null) {
            jaxbMarshaller.marshal(graph, streamWriter);
        }
        else {
            XMLEventWriter eventWriter = StaxUtils.getXMLEventWriter(staxResult);
            if (eventWriter != null) {
                ObjectFactory fact = new ObjectFactory();
                TransactionPositionReport transaction = fact.createTransactionPositionReport();
                transaction.setConfigurableFields(((TransactionPositionReport)graph).getConfigurableFields());
                transaction.setMifir(((TransactionPositionReport)graph).getMifir());
                transaction.setProcessingDetails(((TransactionPositionReport)graph).getProcessingDetails());
                JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
                Marshaller marshaller = jaxbContext.createMarshaller();
                marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
                
                QName qName = new QName(XMLConstants.NULL_NS_URI, "transaction", XMLConstants.DEFAULT_NS_PREFIX);
                JAXBElement<TransactionPositionReport> element = new JAXBElement(qName, TransactionPositionReport.class, TransactionPositionReport.class, transaction);

                marshaller.marshal(element, eventWriter);
            }
            else {
                throw new IllegalArgumentException("StAX Result contains neither XMLStreamWriter nor XMLEventConsumer");
            }
        }
    }

}

观察

如果我reduce500100chunk batch size,等待的是reduced to 2 minutes(approx)

作者需要的时间与块大小成比例,这表明作者在编写每个项目时确实效率低下。

如果您每次检查时的堆栈跟踪都相似,那么根本原因是 OmegaXmlJaxb2Marshaller::marshalStaxResultJAXBContext::newInstance 的调用。为每个要写入的项目创建一个新的编组器开销太大。

你应该移动代码块

JAXBContext jaxbContext = JAXBContext.newInstance(TransactionPositionReport.class);
Marshaller marshaller = jaxbContext.createMarshaller();
marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);

OmegaXmlJaxb2Marshaller 的构造函数并使 marshaller 成为实例变量。

那么,编组器只会创建一次,编写器应该会快得多。