将 Scala 对象序列化为 Json 并索引到 Elasticsearch 时出现 NotXContentException

NotXContentException when serializing Scala object to Json and indexing into Elasticsearch

我试图在 Elasticsearch 中索引一个相对复杂的 Scala 对象。这是我的案例 类:

case class Game(id: Int,
            gameStates: Seq[GameState],
            playerActions: Map[String, PlayerAction],
            gameActions: Map[String, GameAction],
            endGame: EndGame)
case class GameState(players: Seq[Player])
case class Player(id: Int, deck: Seq[Card], playSpace: Seq[Card])
case class Card (rank: Int, suit: String) extends Ordered[Card]

对于这些情况中的每一个 类 我有一个伴随对象,其中包含格式 [ObjectType] writes 方法的定义,格式如下:

object Game {
  implicit object GameFormat extends Format[Game] {
    def writes(game: Game): JsValue = {
      Json.toJson(game.gameStates)
    }

    def reads(json:JsValue): JsResult[Game] = {
      //need this to satisfy compiler
      JsSuccess(Game(-1, Seq(), Map(), Map(), new EndGame("", (x: GameState) => None)))
    }
  }

  implicit object GameIndexable extends Indexable[Game] {
    override def json(game: Game): String =
      game.gameStates.map(x => Json.toJson(x).toString()).mkString(",")
  }
}

这是我的连接设置和执行索引的方法:

val settings = Settings.settingsBuilder()
                    .put("cluster.name", "elasticsearch_dan-dev").build()
val uri = ElasticsearchClientUri("elasticsearch://localhost:9300")
val esServer = ElasticClient.transport(settings, uri)

def sendGameToServer(game: Game) : Unit = {
  esServer.execute {
    index into "war" -> "games" source game
  }
}

当我 运行 我的应用程序时,当我尝试为游戏对象编制索引时,Elasticsearch 出现以下异常:

[2016-02-22 13:34:28,247][DEBUG][action.index             ] [dan_01_dev] failed to execute [index {[war][games][AVMK56uUwAlQMxc6Khfa], source[_na_]}] on [[war][1]]
MapperParsingException[failed to parse]; nested: NotXContentException[Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes];
    at org.elasticsearch.index.mapper.DocumentParser.innerParseDocument(DocumentParser.java:163)
    at org.elasticsearch.index.mapper.DocumentParser.parseDocument(DocumentParser.java:79)
    at org.elasticsearch.index.mapper.DocumentMapper.parse(DocumentMapper.java:304)
    at org.elasticsearch.index.shard.IndexShard.prepareCreate(IndexShard.java:500)
    at org.elasticsearch.index.shard.IndexShard.prepareCreateOnPrimary(IndexShard.java:481)
    at org.elasticsearch.action.index.TransportIndexAction.prepareIndexOperationOnPrimary(TransportIndexAction.java:214)
    at org.elasticsearch.action.index.TransportIndexAction.executeIndexRequestOnPrimary(TransportIndexAction.java:223)
    at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:157)
    at org.elasticsearch.action.index.TransportIndexAction.shardOperationOnPrimary(TransportIndexAction.java:65)
    at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryPhase.doRun(TransportReplicationAction.java:595)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
    at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:263)
    at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:260)
    at org.elasticsearch.transport.TransportService.doRun(TransportService.java:350)
    at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.elasticsearch.common.compress.NotXContentException: Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes
    at org.elasticsearch.common.compress.CompressorFactory.compressor(CompressorFactory.java:85)
    at org.elasticsearch.common.xcontent.XContentHelper.createParser(XContentHelper.java:50)
    at org.elasticsearch.index.mapper.DocumentParser.innerParseDocument(DocumentParser.java:99)
    ... 17 more

我找不到有关此异常的信息。我知道这意味着我的对象格式不正确,无法编入索引,但我不确定除此之外如何进行故障排除。我已经确认当我打印出 Json 而不是索引它时我有有效的 Json。关于为什么 Elasticsearch 不喜欢我正在尝试编制索引的 Json 有什么想法吗?

您的 GameIndexable 正在对您的地图结果调用 .mkString(","),因此您最终会得到一个 "json" 字符串,例如:

{ ... }, { ... }, { ... }

无效 Json。如果您想索引多个游戏状态,则需要批量请求,并且每个请求都包含一个游戏状态。

或者,如果您想索引 Game 对象,那么您的 GameIndexable 应该是这样的:

implicit object GameIndexable extends Indexable[Game] {
  override def json(game: Game): String = GameFormat.writes(game).toString
}