如何使用 Flink Cassandra Connector 编写管理 Cassandra 服务?
How to write to Manage Cassandra Service using Flink Cassandra Connector?
我可以使用以下代码片段连接到 AWS Managed Cassandra Service。
CassandraSink.addSink(cassandraEntityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withCredentials(
"username",
"password")
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc("ap-northeast-1")
.build())
//.withQueryOptions(option)
.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
我在将 Stream POJO 写入 Cassandra 时遇到以下异常。
com.datastax.driver.core.exceptions.InvalidQueryException: Consistency
level LOCAL_ONE is not supported for this operation. Supported
consistency levels are: LOCAL_QUORUM
我能够在另一个项目中解决这个问题(不使用 flink),通过设置 ConsistencyLevel = LOCAL_QUORUM,使用下面的代码片段。
QueryOptions option = new QueryOptions();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
final Cluster cluster =
Cluster.builder()
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withQueryOptions(option) // NOTE
.withAuthProvider(
new PlainTextAuthProvider(
"username",
"password"))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder().withLocalDc("ap-northeast-1").build())
.build();
final Session session = cluster.connect("test");
当我在 flink 中尝试同样的操作时,出现以下错误:
Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException:
com.datastax.driver.core.QueryOptions@130161f7 is not serializable.
The object probably contains or references non serializable fields.
有什么我遗漏的吗?请详细说明如何使用 Flink Cassandra 连接器 connect/write 到 MCS。
PS:
- 我使用下面的命令创建键空间。
CREATE KEYSPACE "test"
WITH
REPLICATION = {'class': 'SingleRegionStrategy'}
我没有在我的代码中使用 AmazonRootCA1.pem。
我没有在我的代码或环境中使用 cassandra_truststore.jks。
我安装了证书 temp_file.der
证书,它是按照 these 步骤创建的。
我使用的是 Flink 1.8.2,因为这是 Kinesis Data Analytics 中可用的环境版本。
更新 07-04-2020
我可以通过为 QueryOptions 创建一个可序列化的包装器来解决序列化问题。请在下面找到代码片段:
import com.datastax.driver.core.QueryOptions;
import java.io.Serializable;
public class QueryOptionsSerializable extends QueryOptions implements Serializable {
private static final long serialVersionUID = 2793938419775311824L;
}
使用此解决方案,我能够在代码中将一致性级别设置为 LOCAL_QUORUM,并且毫无例外地设置为 运行。
// Setting consistency level
QueryOptionsSerializable option = new QueryOptionsSerializable();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
CassandraSink.addSink(entityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
Cluster.Builder tempBuilder = builder.addContactPoint(host).withPort(port);
if (isSSLEnabled) {
// enable SSL config if isSSLEnabled flag is ON.
tempBuilder.withSSL();
}
if (username != null && password != null) {
// if username & password is provided, use it for connection.
tempBuilder.withCredentials(username, password);
}
tempBuilder.withQueryOptions(option);
return tempBuilder.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.setDefaultKeyspace(keyspace)
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
但是在写入 MCS 时,我遇到了同样的错误:
com.datastax.driver.core.exceptions.InvalidQueryException: Consistency
level LOCAL_ONE is not supported for this operation. Supported
consistency levels are: LOCAL_QUORUM
任何帮助将不胜感激!
终于想通了。这是关于使用 @Table
注释的设置一致性。
下面提供的代码片段:
@Table(name = "report", readConsistency = "LOCAL_QUORUM", writeConsistency = "LOCAL_QUORUM")
public class SampleEntity {
@Column(name = "user_id")
@PartitionKey(0)
private String userId;
@Column(name = "join_date")
@PartitionKey(0)
private String joinDate;
}
我可以使用以下代码片段连接到 AWS Managed Cassandra Service。
CassandraSink.addSink(cassandraEntityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withCredentials(
"username",
"password")
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy
.builder()
.withLocalDc("ap-northeast-1")
.build())
//.withQueryOptions(option)
.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
我在将 Stream POJO 写入 Cassandra 时遇到以下异常。
com.datastax.driver.core.exceptions.InvalidQueryException: Consistency level LOCAL_ONE is not supported for this operation. Supported consistency levels are: LOCAL_QUORUM
我能够在另一个项目中解决这个问题(不使用 flink),通过设置 ConsistencyLevel = LOCAL_QUORUM,使用下面的代码片段。
QueryOptions option = new QueryOptions();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
final Cluster cluster =
Cluster.builder()
.addContactPoint("cassandra.ap-northeast-1.amazonaws.com")
.withPort(9142)
.withSSL()
.withQueryOptions(option) // NOTE
.withAuthProvider(
new PlainTextAuthProvider(
"username",
"password"))
.withLoadBalancingPolicy(
DCAwareRoundRobinPolicy.builder().withLocalDc("ap-northeast-1").build())
.build();
final Session session = cluster.connect("test");
当我在 flink 中尝试同样的操作时,出现以下错误:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: com.datastax.driver.core.QueryOptions@130161f7 is not serializable. The object probably contains or references non serializable fields.
有什么我遗漏的吗?请详细说明如何使用 Flink Cassandra 连接器 connect/write 到 MCS。
PS:
- 我使用下面的命令创建键空间。
CREATE KEYSPACE "test"
WITH
REPLICATION = {'class': 'SingleRegionStrategy'}
我没有在我的代码中使用 AmazonRootCA1.pem。
我没有在我的代码或环境中使用 cassandra_truststore.jks。
我安装了证书
temp_file.der
证书,它是按照 these 步骤创建的。我使用的是 Flink 1.8.2,因为这是 Kinesis Data Analytics 中可用的环境版本。
更新 07-04-2020
我可以通过为 QueryOptions 创建一个可序列化的包装器来解决序列化问题。请在下面找到代码片段:
import com.datastax.driver.core.QueryOptions;
import java.io.Serializable;
public class QueryOptionsSerializable extends QueryOptions implements Serializable {
private static final long serialVersionUID = 2793938419775311824L;
}
使用此解决方案,我能够在代码中将一致性级别设置为 LOCAL_QUORUM,并且毫无例外地设置为 运行。
// Setting consistency level
QueryOptionsSerializable option = new QueryOptionsSerializable();
option.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
CassandraSink.addSink(entityStream)
.setClusterBuilder(
new ClusterBuilder() {
private static final long serialVersionUID = 2793938419775311824L;
@Override
public Cluster buildCluster(Cluster.Builder builder) {
Cluster.Builder tempBuilder = builder.addContactPoint(host).withPort(port);
if (isSSLEnabled) {
// enable SSL config if isSSLEnabled flag is ON.
tempBuilder.withSSL();
}
if (username != null && password != null) {
// if username & password is provided, use it for connection.
tempBuilder.withCredentials(username, password);
}
tempBuilder.withQueryOptions(option);
return tempBuilder.build();
}
})
.setMapperOptions(() -> new Mapper.Option[] {Mapper.Option.saveNullFields(true)})
.setDefaultKeyspace(keyspace)
.build()
.name("Write to Cassandra")
.uid("cassandra_sink");
但是在写入 MCS 时,我遇到了同样的错误:
com.datastax.driver.core.exceptions.InvalidQueryException: Consistency level LOCAL_ONE is not supported for this operation. Supported consistency levels are: LOCAL_QUORUM
任何帮助将不胜感激!
终于想通了。这是关于使用 @Table
注释的设置一致性。
下面提供的代码片段:
@Table(name = "report", readConsistency = "LOCAL_QUORUM", writeConsistency = "LOCAL_QUORUM")
public class SampleEntity {
@Column(name = "user_id")
@PartitionKey(0)
private String userId;
@Column(name = "join_date")
@PartitionKey(0)
private String joinDate;
}