是否可以使用 SPARQL 和 RDF4J 对 select 查询进行批处理?
Is there a possibility to batch a select-query with SPARQL and RDF4J?
我正在使用存储在 graphDB Free 中的相当大的数据集(大约 500Mio-Triples)和 运行 在我的本地开发机器上。
我想使用 RDF4J 对数据集进行一些操作,并且必须或多或少地 SELECT 整个数据集。为了进行测试,我只是 SELECT 所需的元组。代码对于前百万元组运行良好,之后它变得非常慢,因为 graphDB 继续分配更多 RAM。
是否可以对非常大的数据集进行 SELECT 查询并分批获取它们?
基本上我只想通过一些选定的三元组“迭代”,因此不需要使用来自 graphDB 的那么多 RAM。我可以看到在查询完成之前我已经在 RDF4J 中获取数据,因为它仅在大约 1.4 Mio 读取元组时崩溃(HeapSpaceError)。不幸的是,graphDB 并没有释放所有已读取元组的内存。我错过了什么吗?
非常感谢您的帮助。
ps。我已经将graphDB的可用heapSpace设置为20GB。
RDF4J (Java) 代码如下所示:
package ch.test;
import org.eclipse.rdf4j.query.*;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.http.HTTPRepository;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
public class RDF2RDF {
public static void main(String[] args) {
System.out.println("Running RDF2RDF");
HTTPRepository sourceRepo = new HTTPRepository("http://localhost:7200/repositories/datatraining");
try {
String path = new File("").getAbsolutePath();
String sparqlCommand= Files.readString(Paths.get(path + "/src/main/resources/sparql/select.sparql"), StandardCharsets.ISO_8859_1);
int chunkSize = 10000;
int positionInChunk = 0;
long loadedTuples = 0;
RepositoryConnection sourceConnection = sourceRepo.getConnection();
TupleQuery query = sourceConnection.prepareTupleQuery(sparqlCommand);
try (TupleQueryResult result = query.evaluate()) {
for (BindingSet solution:result) {
loadedTuples++;
positionInChunk++;
if (positionInChunk >= chunkSize) {
System.out.println("Got " + loadedTuples + " Tuples");
positionInChunk = 0;
}
}
}
} catch (IOException err) {
err.printStackTrace();
}
}
}
select.sparql:
PREFIX XXX_meta_schema: <http://schema.XXX.ch/meta/>
PREFIX XXX_post_schema: <http://schema.XXX.ch/post/>
PREFIX XXX_post_tech_schema: <http://schema.XXX.ch/post/tech/>
PREFIX XXX_geo_schema: <http://schema.XXX.ch/geo/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX XXX_raw_schema: <http://schema.XXX.ch/raw/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT * WHERE {
BIND(<http://data.XXX.ch/raw/Table/XXX.csv> as ?table).
?row XXX_raw_schema:isDefinedBy ?table.
?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;
rdf:value ?valueStreetAdress.
?cellOrt mobi_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;
rdf:value ?valueOrt.
?cellPlz mobi_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;
rdf:value ?valuePLZ.
BIND (URI(concat("http://data.XXX.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
}
我的解决方案:
使用首先获取所有“行”的子选择语句。
PREFIX mobi_post_schema: <http://schema.mobi.ch/post/>
PREFIX mobi_post_tech_schema: <http://schema.mobi.ch/post/tech/>
PREFIX mobi_geo_schema: <http://schema.mobi.ch/geo/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX mobi_raw_schema: <http://schema.mobi.ch/raw/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT * WHERE {
{
SELECT ?row WHERE
{
BIND(<http://data.mobi.ch/raw/Table/Gebaeudeobjekte_August2020_ARA_Post.csv> as ?table).
?row mobi_raw_schema:isDefinedBy ?table.
}
}
?cellStreetAdress mobi_raw_schema:isDefinedBy ?row;
mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/Objektadresse>;
rdf:value ?valueStreetAdress.
?cellOrt mobi_raw_schema:isDefinedBy ?row;
mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/Ort>;
rdf:value ?valueOrt.
?cellPlz mobi_raw_schema:isDefinedBy ?row;
mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/PLZ>;
rdf:value ?valuePLZ.
BIND (URI(concat("http://data.mobi.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
}
我不知道为什么给定的查询对于 GraphDB Free 执行来说在内存方面如此昂贵,但通常很多都取决于数据集的形状和大小。当然,做一个基本上检索整个数据库的查询从一开始就不一定是明智之举。
话虽如此,您还是可以尝试一些事情。使用 LIMIT
和 OFFSET
作为分页机制是一种方法。
您可以尝试的另一种选择是将查询一分为二:一个查询检索您感兴趣的资源的所有标识符,然后遍历这些标识符并为每个查询执行 separate 查询以获取该特定资源的详细信息(属性和关系)。
在您的示例中,您可以在 ?row
上拆分,因此您首先执行查询以获取给定 table:
的所有行
SELECT ?row WHERE {
VALUES ?table { <http://data.XXX.ch/raw/Table/XXX.csv> }
?row XXX_raw_schema:isDefinedBy ?table.
}
然后迭代该结果,将 ?row
的每个返回值注入到检索详细信息的查询中:
SELECT * WHERE {
VALUES ?row { <http://data.XXX.ch/raw/Table/XXX.csv#row1> }
?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;
rdf:value ?valueStreetAdress.
?cellOrt mobi_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;
rdf:value ?valueOrt.
?cellPlz mobi_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;
rdf:value ?valuePLZ.
BIND (URI(concat("http://data.XXX.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
}
在 Java 代码中,是这样的:
String sparqlCommand1 = // the query for all rows of the table
// query for details of each row. Value of row will be injected via the API
String sparqlCommand2 = "SELECT * WHERE { \n"
+ " ?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;\n"
+ " XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;\n"
+ " rdf:value ?valueStreetAdress.\n"
+ " ?cellOrt mobi_raw_schema:isDefinedBy ?row;\n"
+ " XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;\n"
+ " rdf:value ?valueOrt.\n"
+ " ?cellPlz mobi_raw_schema:isDefinedBy ?row;\n"
+ " XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;\n"
+ " rdf:value ?valuePLZ.\n"
+ " BIND (URI(concat(\"http://data.XXX.ch/post/tech/Adress/\", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).\n"
+ "}";
try(RepositoryConnection sourceConnection = sourceRepo.getConnection()) {
TupleQuery rowQuery = sourceConnection.prepareTupleQuery(sparqlCommand1);
TupleQuery detailsQuery = sourceConnection.prepareTupleQuery(sparqlCommand2);
try (TupleQueryResult result = rowQuery.evaluate()) {
for (BindingSet solution: result) {
// inject the current row identifier
detailsQuery.setBinding("row", solution.getValue("row"));
// execute the details query for the row and do something with
// the result
detailsQuery.evaluate().forEach(System.out::println);
}
}
}
当然,您正在以这种方式进行更多查询(N+1,其中 N 是行数),但每个单独的查询结果只是一小块,对于 GraphDB Free(以及您的自己的应用程序)来管理。
我正在使用存储在 graphDB Free 中的相当大的数据集(大约 500Mio-Triples)和 运行 在我的本地开发机器上。
我想使用 RDF4J 对数据集进行一些操作,并且必须或多或少地 SELECT 整个数据集。为了进行测试,我只是 SELECT 所需的元组。代码对于前百万元组运行良好,之后它变得非常慢,因为 graphDB 继续分配更多 RAM。
是否可以对非常大的数据集进行 SELECT 查询并分批获取它们?
基本上我只想通过一些选定的三元组“迭代”,因此不需要使用来自 graphDB 的那么多 RAM。我可以看到在查询完成之前我已经在 RDF4J 中获取数据,因为它仅在大约 1.4 Mio 读取元组时崩溃(HeapSpaceError)。不幸的是,graphDB 并没有释放所有已读取元组的内存。我错过了什么吗?
非常感谢您的帮助。
ps。我已经将graphDB的可用heapSpace设置为20GB。
RDF4J (Java) 代码如下所示:
package ch.test;
import org.eclipse.rdf4j.query.*;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.http.HTTPRepository;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
public class RDF2RDF {
public static void main(String[] args) {
System.out.println("Running RDF2RDF");
HTTPRepository sourceRepo = new HTTPRepository("http://localhost:7200/repositories/datatraining");
try {
String path = new File("").getAbsolutePath();
String sparqlCommand= Files.readString(Paths.get(path + "/src/main/resources/sparql/select.sparql"), StandardCharsets.ISO_8859_1);
int chunkSize = 10000;
int positionInChunk = 0;
long loadedTuples = 0;
RepositoryConnection sourceConnection = sourceRepo.getConnection();
TupleQuery query = sourceConnection.prepareTupleQuery(sparqlCommand);
try (TupleQueryResult result = query.evaluate()) {
for (BindingSet solution:result) {
loadedTuples++;
positionInChunk++;
if (positionInChunk >= chunkSize) {
System.out.println("Got " + loadedTuples + " Tuples");
positionInChunk = 0;
}
}
}
} catch (IOException err) {
err.printStackTrace();
}
}
}
select.sparql:
PREFIX XXX_meta_schema: <http://schema.XXX.ch/meta/>
PREFIX XXX_post_schema: <http://schema.XXX.ch/post/>
PREFIX XXX_post_tech_schema: <http://schema.XXX.ch/post/tech/>
PREFIX XXX_geo_schema: <http://schema.XXX.ch/geo/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX XXX_raw_schema: <http://schema.XXX.ch/raw/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT * WHERE {
BIND(<http://data.XXX.ch/raw/Table/XXX.csv> as ?table).
?row XXX_raw_schema:isDefinedBy ?table.
?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;
rdf:value ?valueStreetAdress.
?cellOrt mobi_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;
rdf:value ?valueOrt.
?cellPlz mobi_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;
rdf:value ?valuePLZ.
BIND (URI(concat("http://data.XXX.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
}
我的解决方案: 使用首先获取所有“行”的子选择语句。
PREFIX mobi_post_schema: <http://schema.mobi.ch/post/>
PREFIX mobi_post_tech_schema: <http://schema.mobi.ch/post/tech/>
PREFIX mobi_geo_schema: <http://schema.mobi.ch/geo/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX mobi_raw_schema: <http://schema.mobi.ch/raw/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
SELECT * WHERE {
{
SELECT ?row WHERE
{
BIND(<http://data.mobi.ch/raw/Table/Gebaeudeobjekte_August2020_ARA_Post.csv> as ?table).
?row mobi_raw_schema:isDefinedBy ?table.
}
}
?cellStreetAdress mobi_raw_schema:isDefinedBy ?row;
mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/Objektadresse>;
rdf:value ?valueStreetAdress.
?cellOrt mobi_raw_schema:isDefinedBy ?row;
mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/Ort>;
rdf:value ?valueOrt.
?cellPlz mobi_raw_schema:isDefinedBy ?row;
mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/PLZ>;
rdf:value ?valuePLZ.
BIND (URI(concat("http://data.mobi.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
}
我不知道为什么给定的查询对于 GraphDB Free 执行来说在内存方面如此昂贵,但通常很多都取决于数据集的形状和大小。当然,做一个基本上检索整个数据库的查询从一开始就不一定是明智之举。
话虽如此,您还是可以尝试一些事情。使用 LIMIT
和 OFFSET
作为分页机制是一种方法。
您可以尝试的另一种选择是将查询一分为二:一个查询检索您感兴趣的资源的所有标识符,然后遍历这些标识符并为每个查询执行 separate 查询以获取该特定资源的详细信息(属性和关系)。
在您的示例中,您可以在 ?row
上拆分,因此您首先执行查询以获取给定 table:
SELECT ?row WHERE {
VALUES ?table { <http://data.XXX.ch/raw/Table/XXX.csv> }
?row XXX_raw_schema:isDefinedBy ?table.
}
然后迭代该结果,将 ?row
的每个返回值注入到检索详细信息的查询中:
SELECT * WHERE {
VALUES ?row { <http://data.XXX.ch/raw/Table/XXX.csv#row1> }
?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;
rdf:value ?valueStreetAdress.
?cellOrt mobi_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;
rdf:value ?valueOrt.
?cellPlz mobi_raw_schema:isDefinedBy ?row;
XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;
rdf:value ?valuePLZ.
BIND (URI(concat("http://data.XXX.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
}
在 Java 代码中,是这样的:
String sparqlCommand1 = // the query for all rows of the table
// query for details of each row. Value of row will be injected via the API
String sparqlCommand2 = "SELECT * WHERE { \n"
+ " ?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;\n"
+ " XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;\n"
+ " rdf:value ?valueStreetAdress.\n"
+ " ?cellOrt mobi_raw_schema:isDefinedBy ?row;\n"
+ " XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;\n"
+ " rdf:value ?valueOrt.\n"
+ " ?cellPlz mobi_raw_schema:isDefinedBy ?row;\n"
+ " XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;\n"
+ " rdf:value ?valuePLZ.\n"
+ " BIND (URI(concat(\"http://data.XXX.ch/post/tech/Adress/\", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).\n"
+ "}";
try(RepositoryConnection sourceConnection = sourceRepo.getConnection()) {
TupleQuery rowQuery = sourceConnection.prepareTupleQuery(sparqlCommand1);
TupleQuery detailsQuery = sourceConnection.prepareTupleQuery(sparqlCommand2);
try (TupleQueryResult result = rowQuery.evaluate()) {
for (BindingSet solution: result) {
// inject the current row identifier
detailsQuery.setBinding("row", solution.getValue("row"));
// execute the details query for the row and do something with
// the result
detailsQuery.evaluate().forEach(System.out::println);
}
}
}
当然,您正在以这种方式进行更多查询(N+1,其中 N 是行数),但每个单独的查询结果只是一小块,对于 GraphDB Free(以及您的自己的应用程序)来管理。