使用 Java 客户端 Api(DMSDK) 从 marklogic 创建 CSV 文件
Create a CSV file from marklogic using Java Client Api(DMSDK)
我想为我的 marklogic 数据库中的 130 万条记录创建一个 csv 文件。我尝试为此使用 CORB,但它花费的时间比我预期的要多。
我的数据是这样的
{
"One": {
"Name": "One",
"Country": "US"
},
"Two": {
"State": "kentucky"
},
"Three": {
"Element1": "value1",
"Element2": "value2",
"Element3": "value3",
"Element4": "value4",
so on ...
}
}
下面是我的 Corb 模块
Selector.xqy
var total = cts.uris("", null, cts.collectionQuery("data"));
fn.insertBefore(total,0,fn.count(total))
Transform.xqy(我将所有元素保存在一个数组中)
var name = fn.tokenize(URI, ";");
const node = cts.doc(name);
var a= node.xpath("/One/*");
var b= node.xpath("/Two/*");
var c= node.xpath("/Three/*");
fn.stringJoin([a, b, c,name], " , ")
我的属性文件
THREAD-COUNT=16
BATCH-SIZE=1000
URIS-MODULE=selector.sjs|ADHOC
PROCESS-MODULE=transform.sjs|ADHOC
PROCESS-TASK=com.marklogic.developer.corb.ExportBatchToFileTask
EXPORT-FILE-NAME=Report.csv
PRE-BATCH-TASK=com.marklogic.developer.corb.PreBatchUpdateFileTask
EXPORT-FILE-TOP-CONTENT=Col1,col2,....col16 -- i have 16 columns
创建一个 csv 文件用了 1 个多小时。而且为了在集群中尝试,我需要先配置一个负载均衡器。而 Java 客户端 api 将在没有任何负载均衡器的情况下在所有节点之间分配工作。
我如何在 Java 客户端 APi 中实现相同的功能,我知道我可以使用 ServerTransform
和 ApplyTransformListener
触发转换模块。
public static void main(String[] args) {
// TODO Auto-generated method stub
DatabaseClient client = DatabaseClientFactory.newClient
("localhost", pwd, "x", "x", DatabaseClientFactory.Authentication.DIGEST);
ServerTransform txform = new ServerTransform("tsm"); -- Here i am implementing same logic of above `tranform module` .
QueryManager qm = client.newQueryManager();
StructuredQueryBuilder query = qm.newStructuredQueryBuilder();
query.collection();
DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher batcher = dmm.newQueryBatcher(query.collections("data"));
batcher.withBatchSize(2000)
.withThreadCount(16)
.withConsistentSnapshot()
.onUrisReady(
new ApplyTransformListener().withTransform(txform))
.onBatchSuccess(batch-> {
System.out.println(
batch.getTimestamp().getTime() +
" documents written: " +
batch.getJobWritesSoFar());
})
.onBatchFailure((batch,throwable) -> {
throwable.printStackTrace();
});
// start the job and feed input to the batcher
dmm.startJob(batcher);
batcher.awaitCompletion();
dmm.stopJob(batcher);
client.release();
}
但是我怎样才能像 CORB 中那样发送 csv 文件头(即 EXPORT-FILE-TOP-CONTENT)。是否有实施 CSV
文件的文档?哪个 class 会实施?
感谢任何帮助
谢谢
可能最简单的选择是 ml-gradle Exporting data to CSV,它在后台使用 Java 客户端 API 和 DMSDK。
请注意,您可能需要安装 server-side REST 转换以仅提取 CSV 输出中所需的数据,而不是下载整个文档内容然后在 Java 上提取边.
有关使用 DMSDK 和创建聚合 CSV(所有记录一个 CSV)所需代码的工作示例,请参阅 ExporToWriterListenerTest.testMassExportToWriter。为了 SO,这里是关键代码片段(有一些小的简化更改,包括编写列 headers(未经测试的代码)):
try (FileWriter writer = new FileWriter(outputFile)) {
writer.write("uri,collection,contents");
writer.flush();
ExportToWriterListener exportListener = new ExportToWriterListener(writer)
.withRecordSuffix("\n")
.withMetadataCategory(DocumentManager.Metadata.COLLECTIONS)
.onGenerateOutput(
record -> {
String uri = record.getUri();
String collection = record.getMetadata(new DocumentMetadataHandle()).getCollections().iterator().next();
String contents = record.getContentAs(String.class);
return uri + "," + collection + "," + contents;
}
);
QueryBatcher queryJob =
moveMgr.newQueryBatcher(query)
.withThreadCount(5)
.withBatchSize(10)
.onUrisReady(exportListener)
.onQueryFailure( throwable -> throwable.printStackTrace() );
moveMgr.startJob( queryJob );
queryJob.awaitCompletion();
moveMgr.stopJob( queryJob );
}
但是,除非您知道您的内容没有双引号、换行符或 non-ascii 个字符,否则建议使用 CSV 库以确保您的输出被正确转义。要使用 CSV 库,您当然可以使用适用于您的库的任何教程。您无需担心线程安全,因为 ExportToWriterListener 在同步块中运行您的侦听器以防止对编写器的重叠写入。这里是 an example of using one CSV library, Jackson CsvMapper.
请注意,您不必使用 ExportToWriterListener 。 . .您可以将其用作编写自己的侦听器的起点。特别是,由于您主要关心的是性能,您可能希望让您的侦听器每个线程写入一个文件,然后 post-process 将它们组合在一起。由你决定。
我想为我的 marklogic 数据库中的 130 万条记录创建一个 csv 文件。我尝试为此使用 CORB,但它花费的时间比我预期的要多。 我的数据是这样的
{
"One": {
"Name": "One",
"Country": "US"
},
"Two": {
"State": "kentucky"
},
"Three": {
"Element1": "value1",
"Element2": "value2",
"Element3": "value3",
"Element4": "value4",
so on ...
}
}
下面是我的 Corb 模块
Selector.xqy
var total = cts.uris("", null, cts.collectionQuery("data"));
fn.insertBefore(total,0,fn.count(total))
Transform.xqy(我将所有元素保存在一个数组中)
var name = fn.tokenize(URI, ";");
const node = cts.doc(name);
var a= node.xpath("/One/*");
var b= node.xpath("/Two/*");
var c= node.xpath("/Three/*");
fn.stringJoin([a, b, c,name], " , ")
我的属性文件
THREAD-COUNT=16
BATCH-SIZE=1000
URIS-MODULE=selector.sjs|ADHOC
PROCESS-MODULE=transform.sjs|ADHOC
PROCESS-TASK=com.marklogic.developer.corb.ExportBatchToFileTask
EXPORT-FILE-NAME=Report.csv
PRE-BATCH-TASK=com.marklogic.developer.corb.PreBatchUpdateFileTask
EXPORT-FILE-TOP-CONTENT=Col1,col2,....col16 -- i have 16 columns
创建一个 csv 文件用了 1 个多小时。而且为了在集群中尝试,我需要先配置一个负载均衡器。而 Java 客户端 api 将在没有任何负载均衡器的情况下在所有节点之间分配工作。
我如何在 Java 客户端 APi 中实现相同的功能,我知道我可以使用 ServerTransform
和 ApplyTransformListener
触发转换模块。
public static void main(String[] args) {
// TODO Auto-generated method stub
DatabaseClient client = DatabaseClientFactory.newClient
("localhost", pwd, "x", "x", DatabaseClientFactory.Authentication.DIGEST);
ServerTransform txform = new ServerTransform("tsm"); -- Here i am implementing same logic of above `tranform module` .
QueryManager qm = client.newQueryManager();
StructuredQueryBuilder query = qm.newStructuredQueryBuilder();
query.collection();
DataMovementManager dmm = client.newDataMovementManager();
QueryBatcher batcher = dmm.newQueryBatcher(query.collections("data"));
batcher.withBatchSize(2000)
.withThreadCount(16)
.withConsistentSnapshot()
.onUrisReady(
new ApplyTransformListener().withTransform(txform))
.onBatchSuccess(batch-> {
System.out.println(
batch.getTimestamp().getTime() +
" documents written: " +
batch.getJobWritesSoFar());
})
.onBatchFailure((batch,throwable) -> {
throwable.printStackTrace();
});
// start the job and feed input to the batcher
dmm.startJob(batcher);
batcher.awaitCompletion();
dmm.stopJob(batcher);
client.release();
}
但是我怎样才能像 CORB 中那样发送 csv 文件头(即 EXPORT-FILE-TOP-CONTENT)。是否有实施 CSV
文件的文档?哪个 class 会实施?
感谢任何帮助
谢谢
可能最简单的选择是 ml-gradle Exporting data to CSV,它在后台使用 Java 客户端 API 和 DMSDK。
请注意,您可能需要安装 server-side REST 转换以仅提取 CSV 输出中所需的数据,而不是下载整个文档内容然后在 Java 上提取边.
有关使用 DMSDK 和创建聚合 CSV(所有记录一个 CSV)所需代码的工作示例,请参阅 ExporToWriterListenerTest.testMassExportToWriter。为了 SO,这里是关键代码片段(有一些小的简化更改,包括编写列 headers(未经测试的代码)):
try (FileWriter writer = new FileWriter(outputFile)) {
writer.write("uri,collection,contents");
writer.flush();
ExportToWriterListener exportListener = new ExportToWriterListener(writer)
.withRecordSuffix("\n")
.withMetadataCategory(DocumentManager.Metadata.COLLECTIONS)
.onGenerateOutput(
record -> {
String uri = record.getUri();
String collection = record.getMetadata(new DocumentMetadataHandle()).getCollections().iterator().next();
String contents = record.getContentAs(String.class);
return uri + "," + collection + "," + contents;
}
);
QueryBatcher queryJob =
moveMgr.newQueryBatcher(query)
.withThreadCount(5)
.withBatchSize(10)
.onUrisReady(exportListener)
.onQueryFailure( throwable -> throwable.printStackTrace() );
moveMgr.startJob( queryJob );
queryJob.awaitCompletion();
moveMgr.stopJob( queryJob );
}
但是,除非您知道您的内容没有双引号、换行符或 non-ascii 个字符,否则建议使用 CSV 库以确保您的输出被正确转义。要使用 CSV 库,您当然可以使用适用于您的库的任何教程。您无需担心线程安全,因为 ExportToWriterListener 在同步块中运行您的侦听器以防止对编写器的重叠写入。这里是 an example of using one CSV library, Jackson CsvMapper.
请注意,您不必使用 ExportToWriterListener 。 . .您可以将其用作编写自己的侦听器的起点。特别是,由于您主要关心的是性能,您可能希望让您的侦听器每个线程写入一个文件,然后 post-process 将它们组合在一起。由你决定。