尝试在调用转换模块时实施批处理?
Tried to implement Batch processing while invoking a transform module?
我想使用 MarkLogic Java 客户端 Api 批处理一组文档。我按照下面 documentation 调用了 JavaScript 模块。
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.datamovement.ApplyTransformListener;
import com.marklogic.client.datamovement.ApplyTransformListener.ApplyResult;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.document.JSONDocumentManager;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.io.DOMHandle;
import com.marklogic.client.query.StructuredQueryBuilder;
public class rest {
public static void main(String[] args) {
// TODO Auto-generated method stub
DatabaseClient client = DatabaseClientFactory.newClient
("localhost", port, "x", "x", DatabaseClientFactory.Authentication.DIGEST);
final DataMovementManager manager = client.newDataMovementManager();
// Build query
final StructuredQueryBuilder query = client
.newQueryManager()
.newStructuredQueryBuilder();
// Specify a server-side transformation module (stored procedure) by name
ServerTransform transform = new ServerTransform("restone-tsm");
ApplyTransformListener transformListener = new ApplyTransformListener()
.withTransform(transform)
.withApplyResult(ApplyResult.REPLACE) // Transform in-place, i.e. rewrite
.onSuccess(batch -> {})
.onSkipped(batch -> {})
.onBatchFailure((batch, throwable) -> {});
// Apply the transformation to only the documents that match a query.
// In this case, those in the “raw” collection.
final QueryBatcher batcher = manager
.newQueryBatcher(query.collection("accounts"));
batcher
.withBatchSize(1000)
.withThreadCount(16)
.onUrisReady(transformListener)
.onQueryFailure(exception -> exception.printStackTrace());
final JobTicket ticket = manager.startJob(batcher);
batcher.awaitCompletion();
manager.stopJob(ticket);
}
}
按照您的建议更改了我的转换模块(即restone-tsm
)
function harmonize(context, params, content)
{
var transformed = {};
transformed.Metadata = { "Source" : "International"};
transformed.Canonical= {"Future" : "Element"};
transformed.Source = content;
xdmp.documentInsert(fn.concat("/transformed/", fn.baseUri(content)), transformed, {collections : "transform"});
};
exports.transform = harmonize;
执行成功。但是正如您在评论中建议在 query batcher
中应用 cts.uris
我在 StructuredQueryBuilder
中检查了该功能但没有找到任何功能。但是上面的代码工作正常。
感谢任何帮助
谢谢
而不是单独的 ServerEvaluationCall
, use an ApplyTransformListener
with your batcher, as described in Applying an In-Database Transformation:
public static void main(String[] args) {
// TODO Auto-generated method stub
DatabaseClient client = DatabaseClientFactory.newClient
("localhost", pwd, "x", "x", DatabaseClientFactory.Authentication.DIGEST);
ServerTransform txform = new ServerTransform("tsm");
QueryManager qm = client.newQueryManager();
StructuredQueryBuilder query = qm.newStructuredQueryBuilder();
query.collection();
DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher batcher = dmm.newQueryBatcher(query);
batcher.withBatchSize(5)
.withThreadCount(3)
.withConsistentSnapshot()
.onUrisReady(
new ApplyTransformListener().withTransform(txform))
.onBatchSuccess(batch-> {
System.out.println(
batch.getTimestamp().getTime() +
" documents written: " +
batch.getJobWritesSoFar());
})
.onBatchFailure((batch,throwable) -> {
throwable.printStackTrace();
});
// start the job and feed input to the batcher
dmm.startJob(batcher);
batcher.awaitCompletion();
dmm.stopJob(batcher);
client.release();
}
您需要确保您的 transform module has a function that implements the required interface and is exports it with the name transform
, and is installed 在服务器上。
调整转换中的逻辑以不执行 URI 查询(将由 QueryBatcher 处理),以便它期望转换 content
.
function harmonize(context, params, content)
{
var transformed = {};
transformed.Metadata = { "Source" : "International"};
transformed.Canonical= {"Future" : "Element"};
transformed.Source = content;
xdmp.documentInsert(fn.concat("/transformed", fn.baseUri(content)), transformed, {collections : "transform"});
};
exports.transform = harmonize;
我想使用 MarkLogic Java 客户端 Api 批处理一组文档。我按照下面 documentation 调用了 JavaScript 模块。
import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.datamovement.ApplyTransformListener;
import com.marklogic.client.datamovement.ApplyTransformListener.ApplyResult;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.document.JSONDocumentManager;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.io.DOMHandle;
import com.marklogic.client.query.StructuredQueryBuilder;
public class rest {
public static void main(String[] args) {
// TODO Auto-generated method stub
DatabaseClient client = DatabaseClientFactory.newClient
("localhost", port, "x", "x", DatabaseClientFactory.Authentication.DIGEST);
final DataMovementManager manager = client.newDataMovementManager();
// Build query
final StructuredQueryBuilder query = client
.newQueryManager()
.newStructuredQueryBuilder();
// Specify a server-side transformation module (stored procedure) by name
ServerTransform transform = new ServerTransform("restone-tsm");
ApplyTransformListener transformListener = new ApplyTransformListener()
.withTransform(transform)
.withApplyResult(ApplyResult.REPLACE) // Transform in-place, i.e. rewrite
.onSuccess(batch -> {})
.onSkipped(batch -> {})
.onBatchFailure((batch, throwable) -> {});
// Apply the transformation to only the documents that match a query.
// In this case, those in the “raw” collection.
final QueryBatcher batcher = manager
.newQueryBatcher(query.collection("accounts"));
batcher
.withBatchSize(1000)
.withThreadCount(16)
.onUrisReady(transformListener)
.onQueryFailure(exception -> exception.printStackTrace());
final JobTicket ticket = manager.startJob(batcher);
batcher.awaitCompletion();
manager.stopJob(ticket);
}
}
按照您的建议更改了我的转换模块(即restone-tsm
)
function harmonize(context, params, content)
{
var transformed = {};
transformed.Metadata = { "Source" : "International"};
transformed.Canonical= {"Future" : "Element"};
transformed.Source = content;
xdmp.documentInsert(fn.concat("/transformed/", fn.baseUri(content)), transformed, {collections : "transform"});
};
exports.transform = harmonize;
执行成功。但是正如您在评论中建议在 query batcher
中应用 cts.uris
我在 StructuredQueryBuilder
中检查了该功能但没有找到任何功能。但是上面的代码工作正常。
感谢任何帮助
谢谢
而不是单独的 ServerEvaluationCall
, use an ApplyTransformListener
with your batcher, as described in Applying an In-Database Transformation:
public static void main(String[] args) {
// TODO Auto-generated method stub
DatabaseClient client = DatabaseClientFactory.newClient
("localhost", pwd, "x", "x", DatabaseClientFactory.Authentication.DIGEST);
ServerTransform txform = new ServerTransform("tsm");
QueryManager qm = client.newQueryManager();
StructuredQueryBuilder query = qm.newStructuredQueryBuilder();
query.collection();
DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher batcher = dmm.newQueryBatcher(query);
batcher.withBatchSize(5)
.withThreadCount(3)
.withConsistentSnapshot()
.onUrisReady(
new ApplyTransformListener().withTransform(txform))
.onBatchSuccess(batch-> {
System.out.println(
batch.getTimestamp().getTime() +
" documents written: " +
batch.getJobWritesSoFar());
})
.onBatchFailure((batch,throwable) -> {
throwable.printStackTrace();
});
// start the job and feed input to the batcher
dmm.startJob(batcher);
batcher.awaitCompletion();
dmm.stopJob(batcher);
client.release();
}
您需要确保您的 transform module has a function that implements the required interface and is exports it with the name transform
, and is installed 在服务器上。
调整转换中的逻辑以不执行 URI 查询(将由 QueryBatcher 处理),以便它期望转换 content
.
function harmonize(context, params, content)
{
var transformed = {};
transformed.Metadata = { "Source" : "International"};
transformed.Canonical= {"Future" : "Element"};
transformed.Source = content;
xdmp.documentInsert(fn.concat("/transformed", fn.baseUri(content)), transformed, {collections : "transform"});
};
exports.transform = harmonize;