带有 java 示例的 akka cassandra 持久性

akka cassandra persistence with java example

使用 Java 我正在尝试使用 java 测试 akka cassandra 持久性。从 URL (http://doc.akka.io/docs/akka/2.4.0-RC3/java/persistence.html) 我试图让 PersistentActorExample 与 cassandra 一起工作,我 运行 遇到以下问题。

我正在使用下面提到的 application.conf。你们有任何 java 实施示例可供我开始使用吗?相同的代码在 leveldb 上运行良好。目前我们使用的是 datastax 4.8。我希望这是一个 application.conf 问题。


akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2550
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2556",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }

  persistence {
    journal {
      plugin = "cassandra-journal"
      # Comma-separated list of contact points in the cluster
      cassandra-journal.contact-points = ["dse-9042.service.consul"]
    }

    snapshot-store {
      plugin = "akka.persistence.cassandra.snapshot.CassandraSnapshotStore"
      # Comma-separated list of contact points in the cluster
      cassandra-journal.contact-points = ["dse-9042.service.consul"]
    }
  }

  akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
  akka.actor.default-mailbox.stash-capacity=10000

}

在我的项目中,我使用了以下 Maven 依赖项;

        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-persistence_2.11</artifactId>
            <version>2.4.0-RC3</version>
        </dependency>
        <dependency>
            <groupId>com.github.krasserm</groupId>
            <artifactId>akka-persistence-cassandra_2.11</artifactId>
            <version>0.3.9</version>
        </dependency>

这是我遇到的错误:

[INFO] [10/04/2015 16:52:40.906] [main] [akka.remote.Remoting] Starting remoting
[INFO] [10/04/2015 16:52:41.112] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2550]
[INFO] [10/04/2015 16:52:41.124] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Starting up...
[INFO] [10/04/2015 16:52:41.186] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [10/04/2015 16:52:41.186] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Started up successfully
[INFO] [10/04/2015 16:52:41.193] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Metrics will be retreived from MBeans, and may be incorrect on some platforms. To increase metric accuracy add the 'sigar.jar' to the classpath and the appropriate platform-specific native libary to 'java.library.path'. Reason: java.lang.ClassNotFoundException: org.hyperic.sigar.Sigar
[INFO] [10/04/2015 16:52:41.196] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Metrics collection has started successfully
[INFO] [10/04/2015 16:52:41.380] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2550] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2552]
Uncaught error from thread [ClusterSystem-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[ClusterSystem]
java.lang.AbstractMethodError: akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Lakka/persistence/Persistence;)V
[ERROR] [10/04/2015 16:52:41.950] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.actor.ActorSystemImpl(ClusterSystem)] Uncaught error from thread [ClusterSystem-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.AbstractMethodError: akka.persistence.cassandra.journal.CassandraJournal.akka$persistence$journal$WriteJournalBase$_setter_$persistence_$eq(Lakka/persistence/Persistence;)V
    at akka.persistence.journal.WriteJournalBase$class.$init$(WriteJournalBase.scala:15)
    at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:17)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
    at java.lang.Class.newInstance(Class.java:438)
    at akka.util.Reflect$.instantiate(Reflect.scala:44)
    at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
    at akka.actor.Props.newActor(Props.scala:259)
    at akka.actor.ActorCell.newActor(ActorCell.scala:561)
    at akka.actor.ActorCell.create(ActorCell.scala:587)
    at akka.actor.ActorCell.invokeAll(ActorCell.scala:460)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

    at akka.persistence.journal.WriteJournalBase$class.$init$(WriteJournalBase.scala:15)
    at akka.persistence.cassandra.journal.CassandraJournal.<init>(CassandraJournal.scala:17)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:408)
    at java.lang.Class.newInstance(Class.java:438)
    at akka.util.Reflect$.instantiate(Reflect.scala:44)
    at akka.actor.NoArgsReflectConstructor.produce(Props.scala:357)
    at akka.actor.Props.newActor(Props.scala:259)
    at akka.actor.ActorCell.newActor(ActorCell.scala:561)
    at akka.actor.ActorCell.create(ActorCell.scala:587)
    at akka.actor.ActorCell.invokeAll(ActorCell.scala:460)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:482)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
    at akka.dispatch.Mailbox.run(Mailbox.scala:223)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.`enter code here`java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

正如我从 com.github.krasserm repo 中看到的,他使用

akka 2.3.11com.github.krasserm 0.3.9

但你使用

akka 2.4.0com.github.krasserm 0.3.9

尝试将 akka 版本更改为 2.3.11com.github.krasserm 到版本 0.4

请使用 Akka 的稳定版本 2.4.0(您使用的是 Release Candidate),并将 cassandra 插件的依赖关系提升至 0.4 which was released last week 并支持 Akka 2.4.x.

您收到错误的原因是您引入了冲突版本的 Akka(它是 Journal 插件 API)和 Journal 实现。 Journal Plugin API 在 Akka 2.3 中是实验性的,在 2.4.x 中稳定时可能会发生变化。由于 Akka 2.4.0 期刊 API 是 稳定的 并且不会发生重大变化。