Gremlin Driver 在使用多个端点初始化 ConnectionPool 时阻塞

Gremlin Driver blocks while initializing ConnectionPool with multiple endpoints

我们是 运行 AWS 中的海王星数据库。我们有一个作者和 3 个 reader 个实例。几周前,我们发现负载平衡没有按预期工作。我们发现,我们的软件实例只连接到一个 reader 并保持此连接直到 EOL。所以其他 reader 个实例从未被采用。考虑关注 link https://docs.aws.amazon.com/neptune/latest/userguide/feature-overview-endpoints.html. There is described, that for neptune load balancing, you have to do it client side and one precondition is, that you have to disable DNS cache. The client side implementation is described here https://docs.amazonaws.cn/en_us/neptune/latest/userguide/best-practices-gremlin-java-multiple.html respectively https://docs.aws.amazon.com/neptune/latest/userguide/best-practices-gremlin-java-separate.html 因为我们分别处理 writer 和 reader 集群。我们的软件是用 java 编写的。所以我们按如下方式实现了所描述的问题:

在 jvm 中禁用 DNS 缓存:

java.security.Security.setProperty("networkaddress.cache.ttl", "0");

pom.xml 看起来像:

<properties>
    <gremlin.version>3.4.10</gremlin.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.tinkerpop</groupId>
        <artifactId>gremlin-driver</artifactId>
        <version>${gremlin.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.tinkerpop</groupId>
        <artifactId>tinkergraph-gremlin</artifactId>
        <version>${gremlin.version}</version>
    </dependency>
    <dependency>
        <!-- aws neptune db -->
        <groupId>org.apache.tinkerpop</groupId>
        <artifactId>gremlin-core</artifactId>
        <version>${gremlin.version}</version>
    </dependency>
</dependencies>

正在通过 gremlin 驱动程序连接到数据库:

    Cluster.Builder writer = Cluster.build().port(8182)
            .maxInProcessPerConnection(32).maxSimultaneousUsagePerConnection(32).maxContentLength(4 * 1024 * 1024)
            .serializer(Serializers.GRAPHBINARY_V1D0)
            .addContactPoint("some aws instance enpoint -- 1 --");

    Cluster.Builder reader = Cluster.build().port(8182)
            .maxInProcessPerConnection(32).maxSimultaneousUsagePerConnection(32).maxContentLength(4 * 1024 * 1024)
            .serializer(Serializers.GRAPHBINARY_V1D0)
            .addContactPoint("some aws instance enpoint -- 2 --")
            .addContactPoint("some aws instance enpoint -- 3 --");

    final Cluster writerCluster = writer.create();
    final Cluster readerCluster = reader.create();

    DriverRemoteConnection writerConn = DriverRemoteConnection.using(writerCluster);
    DriverRemoteConnection readerConn = DriverRemoteConnection.using(readerCluster);

    gWriter = AnonymousTraversalSource.traversal().withRemote(writerConn);
    gReader = AnonymousTraversalSource.traversal().withRemote(readerConn);

    for(int i = 0; i < 10; i++){
        NeptuneAdapter.getInstance().setGraph(gWriter);
        System.out.println(gWriter.addV("TestVertex" + i + 1).iterate());
        System.out.println("Vertex added, now: " + gWriter.V().count().next().toString());
        NeptuneAdapter.getInstance().setGraph(gReader);
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        System.out.println(gReader.V().count().next().toString());
        Thread.sleep(1000);
    }

问题是,在 运行 这段代码中,第一次获取图表时没有任何反应。经过一番调试,我们发现在ConnectionPool的构造函数中是阻塞代码。在其中,依赖于 minPoolSize,为每个 Connection 创建了一个 CompletableFuture。其中,连接是通过主机建立的。在通过 Clusters Manager ScheduledExecutor 执行时,ConnectionPool 构造函数正在加入所有 futures。如此处所述,实施似乎是正确的。但是一定有什么东西阻碍了。在检查 gremlin 驱动程序并注释掉连接代码行并设置一个简单的 Thread.sleep() 之后,代码确实按预期工作。现在,负载平衡也开始起作用了。添加一些输出后,上面执行代码的输出如下所示:

CONNECTION_POOL --- constructor --- poolLabel: {address=endpoint -- 1 -- /IP:PORT}
Opening connection pool
LoadBalancingStrategy adding host: Host{address=endpoint -- 1 -- /IP:PORT} host size is now 1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 1 -- /IP:PORT} for next Query
[RemoteStep(DriverServerConnection-address=endpoint -- 1 -- /IP:PORT [graph=g])]
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 1 -- /IP:PORT} for next Query
Vertex added, now: 1
CONNECTION_POOL --- constructor --- poolLabel: {address=endpoint -- 2 -- /IP:PORT}
CONNECTION_POOL --- constructor --- poolLabel: {address=endpoint -- 3 -- /IP:PORT}
Opening connection pool
LoadBalancingStrategy adding host: Host{address=endpoint -- 2 -- /IP:PORT} host size is now 1
Opening connection pool
LoadBalancingStrategy adding host: Host{address=endpoint -- 3 -- /IP:PORT} host size is now 2
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 1 -- /IP:PORT} for next Query
[RemoteStep(DriverServerConnection-address=endpoint -- 1 -- /IP:PORT [graph=g])]
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 1 -- /IP:PORT} for next Query
Vertex added, now: 2
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
1
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
2
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 2 -- /IP:PORT} for next Query
2
CONNECTION_POOL --- borrowConnection --- host: Host{address=endpoint -- 3 -- /IP:PORT} for next Query
2

现在的问题是,我们是否以错误的方式使用了 gremlin 驱动程序,或者这是一个错误,我们应该向 tinkerpop-master 存储库添加一个问题?还是有其他我们不明白的魔法?

最初的 TinkerPop 驱动程序是为一种架构构建的,该架构与 Amazon Neptune 为其只读副本采用的负载平衡只读副本架构有所不同。由于网络套接字的粘性,所有请求都可能被汇集到单个读取实例。我建议看看这个客户:

https://github.com/awslabs/amazon-neptune-tools/tree/master/neptune-gremlin-client

它的构建是为了更好地与 Neptune 的多 reader 性质配合使用,处理故障转移和重启等情况,并解决应用程序需要的 IAM 身份验证等安全问题处理。

我们过去曾在 reader 个节点的 Neptune 负载平衡中遇到过这个问题。我们通过使用

解决了这个问题

https://github.com/awslabs/amazon-neptune-tools/tree/master/neptune-gremlin-client/gremlin-client

我们不得不稍微调整 reader 客户端,以便在客户端处理负载平衡。

创建 reader 客户端的更新方法如下所示:

GremlinClient client;
GremlinCluster cluster;
ClusterEndpointsRefreshAgent clusterEndpointRefreshAgent;
String clusterId = "<your_cluster_id>";

     private void createReaderClient(boolean isIAMAuthEnabled) {
            EndpointsSelector endpointsSelector = EndpointsType.ReadReplicas;
            clusterEndpointRefreshAgent = new ClusterEndpointsRefreshAgent(clusterId, endpointsSelector);
            Collection<String> addresses = clusterEndpointRefreshAgent.getAddresses().get(endpointsSelector);
            if (isIAMAuthEnabled) {
                cluster = createNeptuneGremlinClusterBuilder(addresses);
            } else {
                cluster = createGremlinClusterBuilder(addresses);
            }
    
            client = cluster.connect();
            clusterEndpointRefreshAgent.startPollingNeptuneAPI(
                addrs -> client.refreshEndpoints(addrs.get(endpointsSelector)), 300,
                TimeUnit.SECONDS);
        }
    
     private GremlinCluster createGremlinClusterBuilder(Collection<String> addresses) {
            GremlinClusterBuilder builder = GremlinClusterBuilder.build().port(8182)
                .addContactPoints(addresses).enableSsl(true);
            //set other required properties of GremlinCluster
            return builder.create();
        }
    
     private GremlinCluster createNeptuneGremlinClusterBuilder(Collection<String> addresses) {
            NeptuneGremlinClusterBuilder builder = NeptuneGremlinClusterBuilder.build()
                .port(8182).addContactPoints(addresses)
                .enableSsl(true).enableIamAuth(true);
            // set other required properties of NeptuneGremlinClusterBuilder
            return builder.create();
        }

并且可以在创建 GraphTraversalSource 之前创建此 reader 客户端,如下所示:

    GraphTraversalSource g;
    GraphTraversalSource getGraphTraversalSource(boolean isIAMAuthEnabled) {
        if (g == null) {
            createReaderClient(isIAMAuthEnabled);
            g = AnonymousTraversalSource.traversal().withRemote(DriverRemoteConnection.using(client));
        }
        return g;
    }