Cassandra 存储后端的 Titan 索引问题

Titan index issues with Cassandra storage backend

我正在使用 moderate graph 填充 Titan 1.0.0 单个实例以测试其查询性能。我正在使用 Cassandra 2.0.17 作为存储后端。

问题是我无法创建节点索引,因此无法优化查询结果。我已经阅读了 docs 并且我正在尝试仔细阅读它们但没有取得太大的成功。我正在使用以下 groovy 脚本进行模式定义、数据填充和索引创建:

import com.thinkaurelius.titan.core.*;
import com.thinkaurelius.titan.core.schema.*;
import com.thinkaurelius.titan.graphdb.database.management.ManagementSystem;
import java.time.temporal.ChronoUnit;

graph = TitanFactory.open('conf/my-titan.properties');
mgmt = graph.openManagement();

// Build graph schema
//        Node properties
idProp = mgmt.containsPropertyKey('userId') ?
  mgmt.getPropertyKey('userId') : mgmt.makePropertyKey('id').dataType(String.class).cardinality(Cardinality.SINGLE);
isPublicProp = mgmt.containsPropertyKey('isPublic') ?
  mgmt.getPropertyKey('isPublic') : mgmt.makePropertyKey('isPublic').dataType(Boolean.class).cardinality(Cardinality.SINGLE);
completionPercentageProp = mgmt.containsPropertyKey('completionPercentage') ?
  mgmt.getPropertyKey('completionPercentage') : mgmt.makePropertyKey('completionPercentage').dataType(Integer.class).cardinality(Cardinality.SINGLE);
genderProp = mgmt.containsPropertyKey('gender') ?
 mgmt.getPropertyKey('gender') : mgmt.makePropertyKey('gender').dataType(String.class).cardinality(Cardinality.SINGLE);
regionProp = mgmt.containsPropertyKey('region') ?
 mgmt.getPropertyKey('region') : mgmt.makePropertyKey('region').dataType(String.class).cardinality(Cardinality.SINGLE);
lastLoginProp = mgmt.containsPropertyKey('lastLogin') ?
 mgmt.getPropertyKey('lastLogin') : mgmt.makePropertyKey('lastLogin').dataType(String.class).cardinality(Cardinality.SINGLE);
registrationProp = mgmt.containsPropertyKey('registration') ?
 mgmt.getPropertyKey('registration') : mgmt.makePropertyKey('registration').dataType(String.class).cardinality(Cardinality.SINGLE);
ageProp = mgmt.containsPropertyKey('age') ?  mgmt.getPropertyKey('age') : mgmt.makePropertyKey('age').dataType(Integer.class).cardinality(Cardinality.SINGLE);
mgmt.commit();

nUsers = 0
println 'Starting nodes population...';
// Load users
new File('/home/jarandaf/soc-pokec-profiles.txt').eachLine {
  try {
    fields = it.split('\t').take(8);
    userId = fields[0];
    isPublic = fields[1] == '1' ? true : false;
    completionPercentage = fields[2]
    gender = fields[3] == '1' ? 'male' : 'female';
    region = fields[4];
    lastLogin = fields[5];
    registration = fields[6];
    age = fields[7] as int;
    graph.addVertex('userId', userId, 'isPublic', isPublic, 'completionPercentage', completionPercentage, 'gender', gender, 'region', region, 'lastLogin', lastLogin, 'registration', registration, 'age', age);
  } catch (Exception e) {
    // Silently skip...
  }
  nUsers += 1
  if (nUsers % 100000 == 0) println String.valueOf(nUsers) + ' loaded...';
};
graph.tx().commit();
println 'Nodes population finished';

// Index users by userId, gender and age
println 'Getting node properties...';
mgmt = graph.openManagement();
userId = mgmt.getPropertyKey('userId');
gender = mgmt.getPropertyKey('gender');
age = mgmt.getPropertyKey('age');

println 'Building byUserId index...';
if (mgmt.getGraphIndex('byUserId') == null) mgmt.buildIndex('byUserId', Vertex.class).addKey(userId).buildCompositeIndex();
println 'Building byGender index...';
if (mgmt.getGraphIndex('byGender') == null) mgmt.buildIndex('byGender', Vertex.class).addKey(gender).buildCompositeIndex();
println 'Building byAge index...';
if (mgmt.getGraphIndex('byAge') == null) mgmt.buildIndex('byAge', Vertex.class).addKey(age).buildCompositeIndex();
mgmt.commit();

// Wait for the indexes to become available
println 'Awaiting byUserId graph index status...';
ManagementSystem.awaitGraphIndexStatus(graph, 'byUserId')
  .status(SchemaStatus.REGISTERED)
  .timeout(10, ChronoUnit.MINUTES)
  .call();
println 'Awaiting byGender graph index status...';
ManagementSystem.awaitGraphIndexStatus(graph, 'byGender')
  .status(SchemaStatus.REGISTERED)
  .timeout(10, ChronoUnit.MINUTES)
  .call();

println 'Awaiting byAge graph index status...';
ManagementSystem.awaitGraphIndexStatus(graph, 'byAge')
  .status(SchemaStatus.REGISTERED)
  .timeout(10, ChronoUnit.MINUTES)
  .call();

// Reindex the existing data
mgmt = graph.openManagement();
println 'Reindexing data by byUserId index...';
mgmt.updateIndex(mgmt.getGraphIndex('byUserId'), SchemaAction.REINDEX).get();
println 'Reindexing data by byGender index...';
mgmt.updateIndex(mgmt.getGraphIndex('byGender'), SchemaAction.REINDEX).get();
println 'Reindexing data by byAge index...';
mgmt.updateIndex(mgmt.getGraphIndex('byAge'), SchemaAction.REINDEX).get();
mgmt.commit();

// Enable indexes
println 'Enabling byUserId index...'
mgmt.awaitGraphIndexStatus(graph, 'byUserId').status(SchemaStatus.ENABLED).call();
println 'Enabling byGender index...'
mgmt.awaitGraphIndexStatus(graph, 'byGender').status(SchemaStatus.ENABLED).call();
println 'Enabling byAge index...'
mgmt.awaitGraphIndexStatus(graph, 'byAge').status(SchemaStatus.ENABLED).call();

graph.close();

我遇到的错误如下,与重建索引阶段有关:

08:24:26 ERROR com.thinkaurelius.titan.graphdb.database.management.ManagementLogger  - Evicted [2@0ac717511509-mybox] from cache but waiting too long for transactions to close. Stale transaction alert on: [standardtitantx[0x4b8696a4], standardtitantx[0x2d39f30a], standardtitantx[0x0da9172d], standardtitantx[0x7c6c7909], standardtitantx[0x79dd0a38], standardtitantx[0x5999c49e], standardtitantx[0x5aaba4a7]]
08:24:26 ERROR com.thinkaurelius.titan.graphdb.database.management.ManagementLogger  - Evicted [3@0ac717511509-mybox] from cache but waiting too long for transactions to close. Stale transaction alert on: [standardtitantx[0x4b8696a4], standardtitantx[0x2d39f30a], standardtitantx[0x0da9172d], standardtitantx[0x7c6c7909], standardtitantx[0x79dd0a38], standardtitantx[0x5999c49e], standardtitantx[0x5aaba4a7]]
08:24:26 ERROR com.thinkaurelius.titan.graphdb.database.management.ManagementLogger  - Evicted [4@0ac717511509-mybox] from cache but waiting too long for transactions to close. Stale transaction alert on: [standardtitantx[0x4b8696a4], standardtitantx[0x2d39f30a], standardtitantx[0x0da9172d], standardtitantx[0x7c6c7909], standardtitantx[0x79dd0a38], standardtitantx[0x5999c49e], standardtitantx[0x5aaba4a7]]

如有任何提示,我们将不胜感激。

您收到的错误表明您在尝试修改架构时有打开的事务。 Titan 需要等待所有事务完成才能修改模式。有关详细信息,请参阅 answer from Matthias Broecheler on the mailing list

一般来说,您应该尽可能避免重建索引,因为它需要 Titan 遍历 所有顶点以查看它们是否需要添加到应该更新的索引中。 The documentation 包含有关此过程的更多信息。

对于您的用例,您可以在加载任何数据之前简单地创建所有索引。当您在所有索引都准备好后添加数据时,它们将被简单地添加到索引中。这样,您应该能够立即使用索引。

Groovy中模式创建的最小示例(但在Java中应该基本相同):

import com.thinkaurelius.titan.core.TitanFactory;
import com.thinkaurelius.titan.core.Multiplicity;
import com.thinkaurelius.titan.core.Cardinality;

graph = TitanFactory.open('conf/my-titan.properties')

mgmt = graph.openManagement()

id = mgmt.makePropertyKey('id').dataType(String.class).cardinality(Cardinality.SINGLE)

// some other properties that will not be indexed
mgmt.makePropertyKey('isPublic').dataType(Boolean.class).cardinality(Cardinality.SINGLE)
mgmt.makePropertyKey('completionPercentage').dataType(Integer.class).cardinality(Cardinality.SINGLE)

// I prefer to use vertex labels to differentiate between different 'types' of vertices but this isn't necessary
User = mgmt.makeVertexLabel('User').make()

mgmt.buildIndex('UserById',Vertex.class).addKey(id).indexOnly(user).buildCompositeIndex()

mgmt.commit()

为简单起见,我删除了对现有架构元素的所有检查,但您当然可以再次添加它们。 创建模式后,您可以像以前一样添加数据。

关于索引管理的最后一个节点:尝试始终在创建索引的同一事务中定义要索引的 属性 键。否则,Titan无法知道是否已经有数据需要添加到新索引中,需要再次完整扫描所有数据。这可能需要为 属性 选择不同的名称。例如,当您添加一个新的顶点标签 post 时,您可能想要使用一个新名称,例如 postId ] 而不是再次使用 属性 id 以避免扫描所有现有数据。