ConfigurationException:触发器 class 'org.apache.cassandra.triggers.AuditTrigger' 不存在
ConfigurationException: Trigger class 'org.apache.cassandra.triggers.AuditTrigger' doesn't exist
我正在使用 here 创建触发器,但它根本不起作用,我收到 ConfigurationException:触发器 class 'org.apache.cassandra.triggers.AuditTrigger' 不存在。
我创建触发器的步骤:
1:我使用
编译了我的 java 文件
javac -cp /CassandraTriggerExample/lib/cassandra-all-3.6.jar
AuditTrigger.Java
2:Jar 创建:
jar -cvf 触发器-example.jar AuditTrigger.class
3: 我检查了我的 jar 文件的内容:
"unzip -l trigger-example.jar"
4: 将此 jar 文件复制到:
cassandra_home/conf/triggers
5: 复制 AuditTrigger.properties 到:
cassandra_home/conf
6: 重新启动 cassandra 服务器
7: ./nodetool -h localhost reloadtriggers
8:在 system.log 中我可以看到条目:
INFO [RMI TCP Connection(2)-127.0.0.1] 2018-07-22 22:15:25,827
CustomClassLoader.java:89 - Loading new jar
/Users/uname/cassandra/conf/triggers/trigger-example.jar
9:现在,当我使用 :
创建触发器时
CREATE TRIGGER test1 ON test.test
USING 'org.apache.cassandra.triggers.AuditTrigger';
我得到 "ConfigurationException: Trigger class 'org.apache.cassandra.triggers.AuditTrigger' doesn't exist"。
我认为问题在于您的 jar 没有正确打包:如果您的 class 的名称为 org.apache.cassandra.triggers.AuditTrigger
,那么它应该位于 jar 文件中的 org/apache/cassandra/triggers/AuditTrigger.class
下。 ..
有关如何找到 classes 的详细说明,请参阅 this documentation...
确实有类似的问题。可能是因为您复制了它但没有重新加载触发器或创建触发器。通过以下检查和执行命令 re-load 并创建触发器来解决它。
检查
确保 class 的名称为 org.apache.cassandra.triggers.AuditTrigger
并且它位于 jar 文件中的 org/apache/cassandra/triggers/AuditTrigger.class
下。
CMD命令
转到 Cassandra 安装文件夹的 bin
文件夹到 运行 nodetool reloadtriggers
命令如下。
C:\Cassandra\apache-cassandra-3.11.6\bin>nodetool reloadtriggers
在 cqlsh 提示符下执行以下语句
CREATE TRIGGER test1 ON test.test USING 'org.apache.cassandra.triggers.AuditTrigger';
您的触发器现在应该可用了!
如果问题仍然存在,您可以尝试重新启动服务器一次以查看是否可用。
在下面的代码中找到我用来在每次插入 Cassandra DB 时向 Kafka 消费者发布消息的示例。您可以修改相同的更新。我使用了 JDK 1.8.0_251
、apache-cassandra-3.11.7
、kafka_2.13-2.6.0
和 Zookeeper-3.6.1
。
/**
*
*/
package com.cass.kafka.insert.trigger;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.triggers.ITrigger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @author Dinesh.Lomte
*
*/
public class InsertCassTriggerForKafkaPublish implements ITrigger {
private String topic;
private Producer<String, String> producer;
private ThreadPoolExecutor threadPoolExecutor;
/**
*
*/
public InsertCassTriggerForKafkaPublish() {
Thread.currentThread().setContextClassLoader(null);
topic = "test";
producer = new KafkaProducer<String, String>(getProps());
threadPoolExecutor = new ThreadPoolExecutor(4, 20, 30,
TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
}
/**
*
*/
@Override
public Collection<Mutation> augment(Partition partition) {
threadPoolExecutor.execute(() -> handleUpdate(partition));
return Collections.emptyList();
}
/**
*
* @param partition
*/
private void handleUpdate(Partition partition) {
if (!partition.partitionLevelDeletion().isLive()) {
return;
}
UnfilteredRowIterator it = partition.unfilteredIterator();
while (it.hasNext()) {
Unfiltered un = it.next();
Row row = (Row) un;
if (row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE) {
Iterator<Cell> cells = row.cells().iterator();
Iterator<ColumnDefinition> columns = row.columns().iterator();
while (cells.hasNext() && columns.hasNext()) {
ColumnDefinition columnDef = columns.next();
Cell cell = cells.next();
if ("payload_json".equals(columnDef.name.toString())) {
producer.send(new ProducerRecord<>(
topic, columnDef.type.getString(cell.value())));
break;
}
}
}
}
}
/**
*
* @return
*/
private Properties getProps() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
}
我正在使用 here 创建触发器,但它根本不起作用,我收到 ConfigurationException:触发器 class 'org.apache.cassandra.triggers.AuditTrigger' 不存在。
我创建触发器的步骤:
1:我使用
编译了我的 java 文件javac -cp /CassandraTriggerExample/lib/cassandra-all-3.6.jar AuditTrigger.Java
2:Jar 创建:
jar -cvf 触发器-example.jar AuditTrigger.class
3: 我检查了我的 jar 文件的内容:
"unzip -l trigger-example.jar"
4: 将此 jar 文件复制到: cassandra_home/conf/triggers
5: 复制 AuditTrigger.properties 到: cassandra_home/conf
6: 重新启动 cassandra 服务器
7: ./nodetool -h localhost reloadtriggers
8:在 system.log 中我可以看到条目:
INFO [RMI TCP Connection(2)-127.0.0.1] 2018-07-22 22:15:25,827
CustomClassLoader.java:89 - Loading new jar
/Users/uname/cassandra/conf/triggers/trigger-example.jar
9:现在,当我使用 :
创建触发器时 CREATE TRIGGER test1 ON test.test
USING 'org.apache.cassandra.triggers.AuditTrigger';
我得到 "ConfigurationException: Trigger class 'org.apache.cassandra.triggers.AuditTrigger' doesn't exist"。
我认为问题在于您的 jar 没有正确打包:如果您的 class 的名称为 org.apache.cassandra.triggers.AuditTrigger
,那么它应该位于 jar 文件中的 org/apache/cassandra/triggers/AuditTrigger.class
下。 ..
有关如何找到 classes 的详细说明,请参阅 this documentation...
确实有类似的问题。可能是因为您复制了它但没有重新加载触发器或创建触发器。通过以下检查和执行命令 re-load 并创建触发器来解决它。
检查
确保 class 的名称为 org.apache.cassandra.triggers.AuditTrigger
并且它位于 jar 文件中的 org/apache/cassandra/triggers/AuditTrigger.class
下。
CMD命令
转到 Cassandra 安装文件夹的 bin
文件夹到 运行 nodetool reloadtriggers
命令如下。
C:\Cassandra\apache-cassandra-3.11.6\bin>nodetool reloadtriggers
在 cqlsh 提示符下执行以下语句
CREATE TRIGGER test1 ON test.test USING 'org.apache.cassandra.triggers.AuditTrigger';
您的触发器现在应该可用了!
如果问题仍然存在,您可以尝试重新启动服务器一次以查看是否可用。
在下面的代码中找到我用来在每次插入 Cassandra DB 时向 Kafka 消费者发布消息的示例。您可以修改相同的更新。我使用了 JDK 1.8.0_251
、apache-cassandra-3.11.7
、kafka_2.13-2.6.0
和 Zookeeper-3.6.1
。
/**
*
*/
package com.cass.kafka.insert.trigger;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.rows.Cell;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.triggers.ITrigger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @author Dinesh.Lomte
*
*/
public class InsertCassTriggerForKafkaPublish implements ITrigger {
private String topic;
private Producer<String, String> producer;
private ThreadPoolExecutor threadPoolExecutor;
/**
*
*/
public InsertCassTriggerForKafkaPublish() {
Thread.currentThread().setContextClassLoader(null);
topic = "test";
producer = new KafkaProducer<String, String>(getProps());
threadPoolExecutor = new ThreadPoolExecutor(4, 20, 30,
TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
}
/**
*
*/
@Override
public Collection<Mutation> augment(Partition partition) {
threadPoolExecutor.execute(() -> handleUpdate(partition));
return Collections.emptyList();
}
/**
*
* @param partition
*/
private void handleUpdate(Partition partition) {
if (!partition.partitionLevelDeletion().isLive()) {
return;
}
UnfilteredRowIterator it = partition.unfilteredIterator();
while (it.hasNext()) {
Unfiltered un = it.next();
Row row = (Row) un;
if (row.primaryKeyLivenessInfo().timestamp() != Long.MIN_VALUE) {
Iterator<Cell> cells = row.cells().iterator();
Iterator<ColumnDefinition> columns = row.columns().iterator();
while (cells.hasNext() && columns.hasNext()) {
ColumnDefinition columnDef = columns.next();
Cell cell = cells.next();
if ("payload_json".equals(columnDef.name.toString())) {
producer.send(new ProducerRecord<>(
topic, columnDef.type.getString(cell.value())));
break;
}
}
}
}
}
/**
*
* @return
*/
private Properties getProps() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return properties;
}
}