ConfigurationException:触发器 class 'org.apache.cassandra.triggers.AuditTrigger' 不存在

ConfigurationException: Trigger class 'org.apache.cassandra.triggers.AuditTrigger' doesn't exist

我正在使用 here 创建触发器,但它根本不起作用,我收到 ConfigurationException:触发器 class 'org.apache.cassandra.triggers.AuditTrigger' 不存在。

我创建触发器的步骤:

1:我使用

编译了我的 java 文件

javac -cp /CassandraTriggerExample/lib/cassandra-all-3.6.jar AuditTrigger.Java

2:Jar 创建:

jar -cvf 触发器-example.jar AuditTrigger.class

3: 我检查了我的 jar 文件的内容:

"unzip -l trigger-example.jar"

4: 将此 jar 文件复制到: cassandra_home/conf/triggers

5: 复制 AuditTrigger.properties 到: cassandra_home/conf

6: 重新启动 cassandra 服务器

7: ./nodetool -h localhost reloadtriggers

8:在 system.log 中我可以看到条目:

INFO  [RMI TCP Connection(2)-127.0.0.1] 2018-07-22 22:15:25,827 
   CustomClassLoader.java:89 - Loading new jar 
   /Users/uname/cassandra/conf/triggers/trigger-example.jar

9:现在,当我使用 :

创建触发器时
 CREATE TRIGGER test1 ON test.test
        USING 'org.apache.cassandra.triggers.AuditTrigger';

我得到 "ConfigurationException: Trigger class 'org.apache.cassandra.triggers.AuditTrigger' doesn't exist"。

我认为问题在于您的 jar 没有正确打包:如果您的 class 的名称为 org.apache.cassandra.triggers.AuditTrigger,那么它应该位于 jar 文件中的 org/apache/cassandra/triggers/AuditTrigger.class 下。 ..

有关如何找到 classes 的详细说明,请参阅 this documentation...

确实有类似的问题。可能是因为您复制了它但没有重新加载触发器或创建触发器。通过以下检查和执行命令 re-load 并创建触发器来解决它。

检查

确保 class 的名称为 org.apache.cassandra.triggers.AuditTrigger 并且它位于 jar 文件中的 org/apache/cassandra/triggers/AuditTrigger.class 下。

CMD命令

转到 Cassandra 安装文件夹的 bin 文件夹到 运行 nodetool reloadtriggers 命令如下。

C:\Cassandra\apache-cassandra-3.11.6\bin>nodetool reloadtriggers

在 cqlsh 提示符下执行以下语句

CREATE TRIGGER test1 ON test.test USING 'org.apache.cassandra.triggers.AuditTrigger';

您的触发器现在应该可用了! 如果问题仍然存在,您可以尝试重新启动服务器一次以查看是否可用。 在下面的代码中找到我用来在每次插入 Cassandra DB 时向 Kafka 消费者发布消息的示例。您可以修改相同的更新。我使用了 JDK 1.8.0_251apache-cassandra-3.11.7kafka_2.13-2.6.0Zookeeper-3.6.1

/**
 * 
 */
package com.cass.kafka.insert.trigger;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.triggers.ITrigger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * @author Dinesh.Lomte
 *
 */
public class InsertCassTriggerForKafkaPublish implements ITrigger {

    private String topic;
    private Producer<String, String> producer;
    private ThreadPoolExecutor threadPoolExecutor;

    /**
     * 
     */
    public InsertCassTriggerForKafkaPublish() {
        
        Thread.currentThread().setContextClassLoader(null);
        
        topic = "test";
        producer = new KafkaProducer<String, String>(getProps());
        threadPoolExecutor = new ThreadPoolExecutor(4, 20, 30,
                TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
    }

    /**
     * 
     */
    @Override
    public Collection<Mutation> augment(Partition partition) {
        threadPoolExecutor.execute(() -> handleUpdate(partition));
        return Collections.emptyList();
    }

    /**
     * 
     * @param partition
     */
    private void handleUpdate(Partition partition) {
        
        if (!partition.partitionLevelDeletion().isLive()) {
            return;
        }
        UnfilteredRowIterator it = partition.unfilteredIterator();
        while (it.hasNext()) {
            Unfiltered un = it.next();
            Row row = (Row) un;
            if (row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE) {
                Iterator<Cell> cells = row.cells().iterator();
                Iterator<ColumnDefinition> columns = row.columns().iterator();
                
                while (cells.hasNext() && columns.hasNext()) {
                    ColumnDefinition columnDef = columns.next();
                    Cell cell = cells.next();
                    if ("payload_json".equals(columnDef.name.toString())) {
                        producer.send(new ProducerRecord<>(
                                topic, columnDef.type.getString(cell.value())));                        
                        break;
                    }
                }
            }
        }        
    }

    /**
     * 
     * @return
     */
    private Properties getProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        return properties;
    }
}