带有 java 示例的 akka cassandra 持久性
akka cassandra persistence with java example
使用 Java 我正在尝试使用 java 测试 akka cassandra 持久性。从 URL (http://doc.akka.io/docs/akka/2.4.0-RC3/java/persistence.html) 我试图让 PersistentActorExample 与 cassandra 一起工作,我 运行 遇到以下问题。
我正在使用下面提到的 application.conf。你们有任何 java 实施示例可供我开始使用吗?相同的代码在 leveldb 上运行良好。目前我们使用的是 datastax 4.8。我希望这是一个 application.conf 问题。
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2550
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2556",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
persistence {
journal {
plugin = "cassandra-journal"
# Comma-separated list of contact points in the cluster
cassandra-journal.contact-points = ["dse-9042.service.consul"]
}
snapshot-store {
plugin = "akka.persistence.cassandra.snapshot.CassandraSnapshotStore"
# Comma-separated list of contact points in the cluster
cassandra-journal.contact-points = ["dse-9042.service.consul"]
}
}
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
akka.actor.default-mailbox.stash-capacity=10000
}
在我的项目中,我使用了以下 Maven 依赖项;
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.11</artifactId>
<version>2.4.0-RC3</version>
</dependency>
<dependency>
<groupId>com.github.krasserm</groupId>
<artifactId>akka-persistence-cassandra_2.11</artifactId>
<version>0.3.9</version>
</dependency>
这是我遇到的错误:
[INFO] [10/04/2015 16:52:40.906] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/04/2015 16:52:41.112] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2550]
[INFO] [10/04/2015 16:52:41.124] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Starting up...
[INFO] [10/04/2015 16:52:41.186] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/04/2015 16:52:41.186] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Started up successfully
[INFO] [10/04/2015 16:52:41.193] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/04/2015 16:52:41.196] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Metrics collection has started successfully
[INFO] [10/04/2015 16:52:41.380] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2552]
Uncaught error from thread [ClusterSystem-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[ClusterSystem]
java.lang.AbstractMethodError: akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Lakka/persistence/Persistence;)V
[ERROR] [10/04/2015 16:52:41.950] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.actor.ActorSystemImpl(ClusterSystem)] Uncaught error from thread [ClusterSystem-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError: akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Lakka/persistence/Persistence;)V
at akka.persistence.journal.WriteJournalBase$class.$init$(WriteJournalBase.scala:15)
at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:17)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at java.lang.Class.newInstance(Class.java:438)
at akka.util.Reflect$.instantiate(Reflect.scala:44)
at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
at akka.actor.Props.newActor(Props.scala:259)
at akka.actor.ActorCell.newActor(ActorCell.scala:561)
at akka.actor.ActorCell.create(ActorCell.scala:587)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:460)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.persistence.journal.WriteJournalBase$class.$init$(WriteJournalBase.scala:15)
at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:17)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at java.lang.Class.newInstance(Class.java:438)
at akka.util.Reflect$.instantiate(Reflect.scala:44)
at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
at akka.actor.Props.newActor(Props.scala:259)
at akka.actor.ActorCell.newActor(ActorCell.scala:561)
at akka.actor.ActorCell.create(ActorCell.scala:587)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:460)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.`enter code here`java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
正如我从 com.github.krasserm repo 中看到的,他使用
akka 2.3.11 与 com.github.krasserm 0.3.9
但你使用
akka 2.4.0 与 com.github.krasserm 0.3.9
尝试将 akka 版本更改为 2.3.11
或 com.github.krasserm 到版本 0.4
请使用 Akka 的稳定版本 2.4.0
(您使用的是 Release Candidate),并将 cassandra 插件的依赖关系提升至 0.4
which was released last week 并支持 Akka 2.4.x
.
您收到错误的原因是您引入了冲突版本的 Akka(它是 Journal 插件 API)和 Journal 实现。 Journal Plugin API 在 Akka 2.3 中是实验性的,在 2.4.x
中稳定时可能会发生变化。由于 Akka 2.4.0
期刊 API 是 稳定的 并且不会发生重大变化。
使用 Java 我正在尝试使用 java 测试 akka cassandra 持久性。从 URL (http://doc.akka.io/docs/akka/2.4.0-RC3/java/persistence.html) 我试图让 PersistentActorExample 与 cassandra 一起工作,我 运行 遇到以下问题。
我正在使用下面提到的 application.conf。你们有任何 java 实施示例可供我开始使用吗?相同的代码在 leveldb 上运行良好。目前我们使用的是 datastax 4.8。我希望这是一个 application.conf 问题。
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2550
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2556",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
persistence {
journal {
plugin = "cassandra-journal"
# Comma-separated list of contact points in the cluster
cassandra-journal.contact-points = ["dse-9042.service.consul"]
}
snapshot-store {
plugin = "akka.persistence.cassandra.snapshot.CassandraSnapshotStore"
# Comma-separated list of contact points in the cluster
cassandra-journal.contact-points = ["dse-9042.service.consul"]
}
}
akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
akka.actor.default-mailbox.stash-capacity=10000
}
在我的项目中,我使用了以下 Maven 依赖项;
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-persistence_2.11</artifactId>
<version>2.4.0-RC3</version>
</dependency>
<dependency>
<groupId>com.github.krasserm</groupId>
<artifactId>akka-persistence-cassandra_2.11</artifactId>
<version>0.3.9</version>
</dependency>
这是我遇到的错误:
[INFO] [10/04/2015 16:52:40.906] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/04/2015 16:52:41.112] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2550]
[INFO] [10/04/2015 16:52:41.124] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Starting up...
[INFO] [10/04/2015 16:52:41.186] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/04/2015 16:52:41.186] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Started up successfully
[INFO] [10/04/2015 16:52:41.193] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/04/2015 16:52:41.196] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Metrics collection has started successfully
[INFO] [10/04/2015 16:52:41.380] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2552]
Uncaught error from thread [ClusterSystem-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[ClusterSystem]
java.lang.AbstractMethodError: akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Lakka/persistence/Persistence;)V
[ERROR] [10/04/2015 16:52:41.950] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.actor.ActorSystemImpl(ClusterSystem)] Uncaught error from thread [ClusterSystem-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError: akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Lakka/persistence/Persistence;)V
at akka.persistence.journal.WriteJournalBase$class.$init$(WriteJournalBase.scala:15)
at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:17)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at java.lang.Class.newInstance(Class.java:438)
at akka.util.Reflect$.instantiate(Reflect.scala:44)
at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
at akka.actor.Props.newActor(Props.scala:259)
at akka.actor.ActorCell.newActor(ActorCell.scala:561)
at akka.actor.ActorCell.create(ActorCell.scala:587)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:460)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
at akka.persistence.journal.WriteJournalBase$class.$init$(WriteJournalBase.scala:15)
at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:17)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
at java.lang.Class.newInstance(Class.java:438)
at akka.util.Reflect$.instantiate(Reflect.scala:44)
at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
at akka.actor.Props.newActor(Props.scala:259)
at akka.actor.ActorCell.newActor(ActorCell.scala:561)
at akka.actor.ActorCell.create(ActorCell.scala:587)
at akka.actor.ActorCell.invokeAll(ActorCell.scala:460)
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
at akka.dispatch.Mailbox.run(Mailbox.scala:223)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.`enter code here`java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
正如我从 com.github.krasserm repo 中看到的,他使用
akka 2.3.11 与 com.github.krasserm 0.3.9
但你使用
akka 2.4.0 与 com.github.krasserm 0.3.9
尝试将 akka 版本更改为 2.3.11 或 com.github.krasserm 到版本 0.4
请使用 Akka 的稳定版本 2.4.0
(您使用的是 Release Candidate),并将 cassandra 插件的依赖关系提升至 0.4
which was released last week 并支持 Akka 2.4.x
.
您收到错误的原因是您引入了冲突版本的 Akka(它是 Journal 插件 API)和 Journal 实现。 Journal Plugin API 在 Akka 2.3 中是实验性的,在 2.4.x
中稳定时可能会发生变化。由于 Akka 2.4.0
期刊 API 是 稳定的 并且不会发生重大变化。