带有批处理程序的 MarkLogic 9 Java 客户端出错

Error in MarkLogic 9 Java Client with the batcher

我正在使用 MarkLogic Java 客户端加载大量数据。一段时间后我收到以下错误,然后重复出现。

    23-May-2017 15:09:11.199 WARNING [localhost-startStop-2] org.apache.catalina.loader.WebappClassLoaderBase.clearReferencesThreads The web application [easymetahub] appears to have started a thread named [pool-20-thread-12] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
 sun.misc.Unsafe.park(Native Method)
 java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
 java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
 java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
 java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 java.lang.Thread.run(Thread.java:745)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:105)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:506)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
    at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:620)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1078)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:760)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1524)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: This instance has been stopped
    at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:347)
    at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:283)
    at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:267)
    at com.easymetahub.HarvestJDBCData.doRoot(HarvestJDBCData.java:486)
    at com.easymetahub.HarvestJDBCData.doSomething(HarvestJDBCData.java:243)
    at com.easymetahub.HarvestJDBCData.processBatchSegment(HarvestJDBCData.java:146)
    at com.easymetahub.HarvestJDBCDataServlet.doPost(HarvestJDBCDataServlet.java:33)
    at com.easymetahub.HarvestJDBCDataServlet.doGet(HarvestJDBCDataServlet.java:45)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:622)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:232)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:165)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:199)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:105)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:506)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
    at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:620)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:343)
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:1078)
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66)
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:760)
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1524)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:745)
java.lang.IllegalStateException: This instance has been stopped
    at com.marklogic.client.datamovement.impl.WriteBatcherImpl.requireNotStopped(WriteBatcherImpl.java:347)
    at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:283)
    at com.marklogic.client.datamovement.impl.WriteBatcherImpl.add(WriteBatcherImpl.java:267)
    at com.easymetahub.HarvestJDBCData.doRoot(HarvestJDBCData.java:486)
    at com.easymetahub.HarvestJDBCData.doSomething(HarvestJDBCData.java:243)
    at com.easymetahub.HarvestJDBCData.processBatchSegment(HarvestJDBCData.java:146)
    at com.easymetahub.HarvestJDBCDataServlet.doPost(HarvestJDBCDataServlet.java:33)
    at com.easymetahub.HarvestJDBCDataServlet.doGet(HarvestJDBCDataServlet.java:45)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:622)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:729)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:232)

我从示例中复制并修改了以下代码:

        client = DatabaseClientFactory.newClient("localhost", 8000, "emh-entity-manager-content", new DatabaseClientFactory.DigestAuthContext("admin", "mladmin"));
    DataMovementManager dmm = client.newDataMovementManager();
    batcher = dmm.newWriteBatcher();
    batcher.withBatchSize(5000)
            .withThreadCount(30)
            .onBatchSuccess(batch-> {
                System.out.println(
                        batch.getTimestamp().getTime() +
                                " documents written: " +
                                batch.getJobWritesSoFar() +
                                " \t[" + sourceName + "] [" + start + "]");
            })
            .onBatchFailure((batch,throwable) -> {
                System.out.println("Failure on " + sourceName);
                throwable.printStackTrace();
            });

    dmm.startJob(batcher);

    doSomething(harvestNode);
    // Start any partial batches waiting for more input, then wait
    // for all batches to complete. This call will block.
    batcher.flushAndWait();
    dmm.stopJob(batcher);

doSomething 调用的地方

                batcher.add(entityPath, getStringFromDocument(outDoc));

如何避免这个错误?

我没有发现您在代码示例中做错了什么。我猜您共享的堆栈跟踪来自一些不同的代码。您通常不希望在像 HarvestJDBCDataServlet.doPost 这样的方法中创建一个新的 DatabaseClient,因为您希望您的 DatabaseClient 实例在 webapp 的生命周期内共享,因为它包含共享连接池。出现错误的唯一方法是在调用 dmm.stopJob(batcher) 之后调用 batcher.add。这是日志中唯一的错误还是在此之前有其他错误?无论如何,如果您认为遇到了错误,请在 github 中提交问题。