PoolingOptions 和 Cassandra - java
PoolingOptions and Cassandra - java
我正在使用 datastax 驱动程序将 Cassandra 用作 Apache Flink 的一些数据流的接收器:
我在执行我的应用程序时遇到问题,在运行时引发有关队列的错误,该队列在几秒钟后变满。我发现默认值是 256,这对我的负载来说可能太低了,所以我按照这里的建议使用 poolingOptions 设置 maxRequestsPerConnection 提高了它:http://docs.datastax.com/en/developer/java-driver/3.1/manual/pooling/.
不幸的是,使用以下代码,我在启动时遇到以下错误:
The implementation of the ClusterBuilder is not serializable.
The object probably contains or references non serializable fields.
我的代码:
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);
ClusterBuilder cassandraBuilder = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint(CASSANDRA_ADDRESS)
.withPort(CASSANDRA_PORT)
.withPoolingOptions(poolingOptions)
.build();
}
};
sinkBuilderNormalStream
.setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
+ " (user, sensor, timestamp, rdf_stream, observed_value, value)"
+ " VALUES (?, ?, ?, ?, ?, ?);")
.setClusterBuilder(cassandraBuilder)
.build();
我该如何处理?
您必须在 ClusterBuilder#buildCluster 中定义 PoolingOptions。
我正在使用 datastax 驱动程序将 Cassandra 用作 Apache Flink 的一些数据流的接收器: 我在执行我的应用程序时遇到问题,在运行时引发有关队列的错误,该队列在几秒钟后变满。我发现默认值是 256,这对我的负载来说可能太低了,所以我按照这里的建议使用 poolingOptions 设置 maxRequestsPerConnection 提高了它:http://docs.datastax.com/en/developer/java-driver/3.1/manual/pooling/.
不幸的是,使用以下代码,我在启动时遇到以下错误:
The implementation of the ClusterBuilder is not serializable.
The object probably contains or references non serializable fields.
我的代码:
PoolingOptions poolingOptions = new PoolingOptions();
poolingOptions
.setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
.setMaxRequestsPerConnection(HostDistance.REMOTE, 10000);
ClusterBuilder cassandraBuilder = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint(CASSANDRA_ADDRESS)
.withPort(CASSANDRA_PORT)
.withPoolingOptions(poolingOptions)
.build();
}
};
sinkBuilderNormalStream
.setQuery("INSERT INTO keyspace_local.values_by_sensors_users"
+ " (user, sensor, timestamp, rdf_stream, observed_value, value)"
+ " VALUES (?, ?, ?, ?, ?, ?);")
.setClusterBuilder(cassandraBuilder)
.build();
我该如何处理?
您必须在 ClusterBuilder#buildCluster 中定义 PoolingOptions。