我是否必须为 Cassandra 中的每次写入打开一个新的数据库连接?

Do i have to open a new DB connection for every write in Cassandra?

我正在尝试编写一个简单的 JAVA 程序来生成一些数据(只是一个 POJO),这些数据会在 Kafka 主题中发布。从该主题中,订阅者获取数据并将其写入 Cassandra 数据库。

生产和获取工作正常,但在将数据写入 Cassandra DB 时,有些事情让我感到疑惑。

当我尝试写入数据时,我总是必须打开一个到数据库的新连接。看着很不爽。

 @Override
  public void run() {

    setRunning(true);


    try {
      konsument.subscribe(Collections.singletonList(ServerKonfiguration.TOPIC));

      while (running) {

        ConsumerRecords<Long, SensorDaten> sensorDaten = konsument.poll(Long.MAX_VALUE);
        sensorDaten.forEach(
                datum -> {
                  CassandraConnector cassandraConnector = new CassandraConnector();

                  cassandraConnector.schreibeSensorDaten(datum.key(), datum.value());
                  System.out.printf(
                          "Consumer Record:(%d, %s, %d, %d)\n",
                          datum.key(), datum.value(), datum.partition(), datum.offset());
                });
      }
    } catch (Exception e) {
      e.printStackTrace();
    } finally {
      konsument.close();
    }
  }

上面的代码片段有效,但正如我所提到的,对于每次写入,我都必须创建一个新连接。

当我在循环外初始化 cassandraConnector 时,我进行了一次成功写入,然后我得到 "No hosts available" 个异常。

CassandraConnector class:

 public class CassandraConnector {

  private final String KEYSPACE = "ba2";
  private final String SERVER_IP = "127.0.0.1";
  private Cluster cluster;
  private Session session;

  public CassandraConnector() {
    cluster = Cluster.builder().addContactPoint(SERVER_IP).build();
    session = cluster.connect(KEYSPACE);
  }

  public void schreibeSensorDaten(Long key, SensorDaten datum) {

    try {

      session.execute(
          "INSERT INTO.....

不,您需要重新使用 cluster/session 实例 - 它们在初始化方面相当重量级...

最好使用 prepared statements 进行数据插入 - 创建会话后,执行以下操作:

PreparedStatement pStmt = session.prepare("INSERT INTO ... VALUES (?, ?)");

然后循环

session.execute(pStmt.bind(datum.key(), datum.value()));

关于报错,请查看Cassandra端的日志。

    public class SensorDatenKonsument implements Runnable {

  /** Kafka Konsument */
  private final KafkaConsumer<Long, SensorDaten> konsument;

  /** Einrichtung der Verbindung zu Cassandra */
  private final Cluster cluster =
      Cluster.builder().addContactPoint(TestKonfiguration.CASS_SERVER_IP).build();

  private final Session session = cluster.connect(TestKonfiguration.KEYSPACE);

  public SensorDatenKonsument(String groupId) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, TestKonfiguration.BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(
        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SensorDatenDeserializer.class.getName());
    this.konsument = new KafkaConsumer<>(props);
  }

  @Override
  public void run() {


    try {
      konsument.subscribe(Collections.singletonList(TestKonfiguration.TOPIC));
      PreparedStatement prepStmt =
          session.prepare(
              "INSERT INTO wetterdaten (id, date_time, air_temp, std_air_temp, humidity, std_humidity,"
                  + "IR_temp, std_IR_temp, air_pressure, std_pressure, wind_speed, std_wind_speed, light_A,"
                  + "std_light_A, light_B, std_light_B, distance, std_distance, counter, roll, pitch,"
                  + "X_accel, std_X_accel, Y_accel, std_Y_accel, Z_accel, std_Z_accel, battery, error,"
                  + "WDT_trace, crc3) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)");

      while (true) {
        ConsumerRecords<Long, SensorDaten> kafkaRecord = konsument.poll(Long.MAX_VALUE);
        System.out.println("*** Poll ***");

        kafkaRecord.forEach(
            datum -> {
              session.execute(
                  prepStmt.bind(...