我是否必须为 Cassandra 中的每次写入打开一个新的数据库连接?
Do i have to open a new DB connection for every write in Cassandra?
我正在尝试编写一个简单的 JAVA 程序来生成一些数据(只是一个 POJO),这些数据会在 Kafka 主题中发布。从该主题中,订阅者获取数据并将其写入 Cassandra 数据库。
生产和获取工作正常,但在将数据写入 Cassandra DB 时,有些事情让我感到疑惑。
当我尝试写入数据时,我总是必须打开一个到数据库的新连接。看着很不爽。
@Override
public void run() {
setRunning(true);
try {
konsument.subscribe(Collections.singletonList(ServerKonfiguration.TOPIC));
while (running) {
ConsumerRecords<Long, SensorDaten> sensorDaten = konsument.poll(Long.MAX_VALUE);
sensorDaten.forEach(
datum -> {
CassandraConnector cassandraConnector = new CassandraConnector();
cassandraConnector.schreibeSensorDaten(datum.key(), datum.value());
System.out.printf(
"Consumer Record:(%d, %s, %d, %d)\n",
datum.key(), datum.value(), datum.partition(), datum.offset());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
konsument.close();
}
}
上面的代码片段有效,但正如我所提到的,对于每次写入,我都必须创建一个新连接。
当我在循环外初始化 cassandraConnector
时,我进行了一次成功写入,然后我得到 "No hosts available" 个异常。
CassandraConnector class:
public class CassandraConnector {
private final String KEYSPACE = "ba2";
private final String SERVER_IP = "127.0.0.1";
private Cluster cluster;
private Session session;
public CassandraConnector() {
cluster = Cluster.builder().addContactPoint(SERVER_IP).build();
session = cluster.connect(KEYSPACE);
}
public void schreibeSensorDaten(Long key, SensorDaten datum) {
try {
session.execute(
"INSERT INTO.....
不,您需要重新使用 cluster/session 实例 - 它们在初始化方面相当重量级...
最好使用 prepared statements 进行数据插入 - 创建会话后,执行以下操作:
PreparedStatement pStmt = session.prepare("INSERT INTO ... VALUES (?, ?)");
然后循环
session.execute(pStmt.bind(datum.key(), datum.value()));
关于报错,请查看Cassandra端的日志。
public class SensorDatenKonsument implements Runnable {
/** Kafka Konsument */
private final KafkaConsumer<Long, SensorDaten> konsument;
/** Einrichtung der Verbindung zu Cassandra */
private final Cluster cluster =
Cluster.builder().addContactPoint(TestKonfiguration.CASS_SERVER_IP).build();
private final Session session = cluster.connect(TestKonfiguration.KEYSPACE);
public SensorDatenKonsument(String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TestKonfiguration.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SensorDatenDeserializer.class.getName());
this.konsument = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
konsument.subscribe(Collections.singletonList(TestKonfiguration.TOPIC));
PreparedStatement prepStmt =
session.prepare(
"INSERT INTO wetterdaten (id, date_time, air_temp, std_air_temp, humidity, std_humidity,"
+ "IR_temp, std_IR_temp, air_pressure, std_pressure, wind_speed, std_wind_speed, light_A,"
+ "std_light_A, light_B, std_light_B, distance, std_distance, counter, roll, pitch,"
+ "X_accel, std_X_accel, Y_accel, std_Y_accel, Z_accel, std_Z_accel, battery, error,"
+ "WDT_trace, crc3) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
while (true) {
ConsumerRecords<Long, SensorDaten> kafkaRecord = konsument.poll(Long.MAX_VALUE);
System.out.println("*** Poll ***");
kafkaRecord.forEach(
datum -> {
session.execute(
prepStmt.bind(...
我正在尝试编写一个简单的 JAVA 程序来生成一些数据(只是一个 POJO),这些数据会在 Kafka 主题中发布。从该主题中,订阅者获取数据并将其写入 Cassandra 数据库。
生产和获取工作正常,但在将数据写入 Cassandra DB 时,有些事情让我感到疑惑。
当我尝试写入数据时,我总是必须打开一个到数据库的新连接。看着很不爽。
@Override
public void run() {
setRunning(true);
try {
konsument.subscribe(Collections.singletonList(ServerKonfiguration.TOPIC));
while (running) {
ConsumerRecords<Long, SensorDaten> sensorDaten = konsument.poll(Long.MAX_VALUE);
sensorDaten.forEach(
datum -> {
CassandraConnector cassandraConnector = new CassandraConnector();
cassandraConnector.schreibeSensorDaten(datum.key(), datum.value());
System.out.printf(
"Consumer Record:(%d, %s, %d, %d)\n",
datum.key(), datum.value(), datum.partition(), datum.offset());
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
konsument.close();
}
}
上面的代码片段有效,但正如我所提到的,对于每次写入,我都必须创建一个新连接。
当我在循环外初始化 cassandraConnector
时,我进行了一次成功写入,然后我得到 "No hosts available" 个异常。
CassandraConnector class:
public class CassandraConnector {
private final String KEYSPACE = "ba2";
private final String SERVER_IP = "127.0.0.1";
private Cluster cluster;
private Session session;
public CassandraConnector() {
cluster = Cluster.builder().addContactPoint(SERVER_IP).build();
session = cluster.connect(KEYSPACE);
}
public void schreibeSensorDaten(Long key, SensorDaten datum) {
try {
session.execute(
"INSERT INTO.....
不,您需要重新使用 cluster/session 实例 - 它们在初始化方面相当重量级...
最好使用 prepared statements 进行数据插入 - 创建会话后,执行以下操作:
PreparedStatement pStmt = session.prepare("INSERT INTO ... VALUES (?, ?)");
然后循环
session.execute(pStmt.bind(datum.key(), datum.value()));
关于报错,请查看Cassandra端的日志。
public class SensorDatenKonsument implements Runnable {
/** Kafka Konsument */
private final KafkaConsumer<Long, SensorDaten> konsument;
/** Einrichtung der Verbindung zu Cassandra */
private final Cluster cluster =
Cluster.builder().addContactPoint(TestKonfiguration.CASS_SERVER_IP).build();
private final Session session = cluster.connect(TestKonfiguration.KEYSPACE);
public SensorDatenKonsument(String groupId) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TestKonfiguration.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SensorDatenDeserializer.class.getName());
this.konsument = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
konsument.subscribe(Collections.singletonList(TestKonfiguration.TOPIC));
PreparedStatement prepStmt =
session.prepare(
"INSERT INTO wetterdaten (id, date_time, air_temp, std_air_temp, humidity, std_humidity,"
+ "IR_temp, std_IR_temp, air_pressure, std_pressure, wind_speed, std_wind_speed, light_A,"
+ "std_light_A, light_B, std_light_B, distance, std_distance, counter, roll, pitch,"
+ "X_accel, std_X_accel, Y_accel, std_Y_accel, Z_accel, std_Z_accel, battery, error,"
+ "WDT_trace, crc3) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");
while (true) {
ConsumerRecords<Long, SensorDaten> kafkaRecord = konsument.poll(Long.MAX_VALUE);
System.out.println("*** Poll ***");
kafkaRecord.forEach(
datum -> {
session.execute(
prepStmt.bind(...