尝试在调用转换模块时实施批处理?

Tried to implement Batch processing while invoking a transform module?

我想使用 MarkLogic Java 客户端 Api 批处理一组文档。我按照下面 documentation 调用了 JavaScript 模块。

   import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.datamovement.ApplyTransformListener;
import com.marklogic.client.datamovement.ApplyTransformListener.ApplyResult;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.document.JSONDocumentManager;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.io.DOMHandle;
import com.marklogic.client.query.StructuredQueryBuilder;

public class rest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        DatabaseClient client = DatabaseClientFactory.newClient
                ("localhost", port, "x", "x",  DatabaseClientFactory.Authentication.DIGEST);

        final DataMovementManager manager = client.newDataMovementManager();

          // Build query
        final StructuredQueryBuilder query = client
          .newQueryManager()
          .newStructuredQueryBuilder();

        // Specify a server-side transformation module (stored procedure) by name
        ServerTransform transform = new ServerTransform("restone-tsm");
        ApplyTransformListener transformListener = new ApplyTransformListener()
          .withTransform(transform)
          .withApplyResult(ApplyResult.REPLACE) // Transform in-place, i.e. rewrite
          .onSuccess(batch -> {})
          .onSkipped(batch -> {})
          .onBatchFailure((batch, throwable) -> {});

        // Apply the transformation to only the documents that match a query.
        // In this case, those in the “raw” collection.
        final QueryBatcher batcher = manager
          .newQueryBatcher(query.collection("accounts"));
        batcher
        .withBatchSize(1000)
        .withThreadCount(16)
          .onUrisReady(transformListener)
          .onQueryFailure(exception -> exception.printStackTrace());
        final JobTicket ticket = manager.startJob(batcher);
        batcher.awaitCompletion();
        manager.stopJob(ticket);
        }
            }

按照您的建议更改了我的转换模块(即restone-tsm

   function harmonize(context, params, content)
{ 
  var transformed = {};
  transformed.Metadata = { "Source" : "International"};
  transformed.Canonical= {"Future" : "Element"};
  transformed.Source = content;
  xdmp.documentInsert(fn.concat("/transformed/", fn.baseUri(content)), transformed, {collections : "transform"});
};
exports.transform = harmonize;

执行成功。但是正如您在评论中建议在 query batcher 中应用 cts.uris 我在 StructuredQueryBuilder 中检查了该功能但没有找到任何功能。但是上面的代码工作正常。

感谢任何帮助

谢谢

而不是单独的 ServerEvaluationCall, use an ApplyTransformListener with your batcher, as described in Applying an In-Database Transformation:

public static void main(String[] args) {
  // TODO Auto-generated method stub

  DatabaseClient client = DatabaseClientFactory.newClient
            ("localhost", pwd, "x", "x",  DatabaseClientFactory.Authentication.DIGEST);

  ServerTransform txform = new ServerTransform("tsm"); 

  QueryManager qm = client.newQueryManager();
  StructuredQueryBuilder query = qm.newStructuredQueryBuilder();
  query.collection();

  DataMovementManager dmm = client.newDataMovementManager();
  QueryBatcher batcher = dmm.newQueryBatcher(query);
  batcher.withBatchSize(5)
         .withThreadCount(3)
         .withConsistentSnapshot()
         .onUrisReady(
           new ApplyTransformListener().withTransform(txform))
         .onBatchSuccess(batch-> {
                   System.out.println(
                       batch.getTimestamp().getTime() +
                       " documents written: " +
                       batch.getJobWritesSoFar());
         })
         .onBatchFailure((batch,throwable) -> {
           throwable.printStackTrace();
         });

  // start the job and feed input to the batcher
  dmm.startJob(batcher);

  batcher.awaitCompletion();
  dmm.stopJob(batcher);
  client.release();
}

您需要确保您的 transform module has a function that implements the required interface and is exports it with the name transform, and is installed 在服务器上。

调整转换中的逻辑以不执行 URI 查询(将由 QueryBatcher 处理),以便它期望转换 content.

function harmonize(context, params, content)
{ 
  var transformed = {};
  transformed.Metadata = { "Source" : "International"};
  transformed.Canonical= {"Future" : "Element"};
  transformed.Source = content;
  xdmp.documentInsert(fn.concat("/transformed", fn.baseUri(content)), transformed, {collections : "transform"});
};
exports.transform = harmonize;