scala + kafka 不发送带有 id 的消息

scala + kafka does not send message with id

我正在尝试通过我的 actor 向 Kafka 发送消息,但它不起作用。

以下代码有效

new KeyedMessage[String, Array[Byte]]("my-topic", msg.message)

这个不……为什么?

new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)

甚至

new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, null, msg.message)

将分区设置为空,强制它只填充id,但还是一样。

有什么想法吗?

编辑

好吧,当以字节数组形式发送消息时才意识到它不起作用。

我已经更改了我的代码以允许以字符串形式发送消息并且它有效。

以下情况不起作用:

  props.put("serializer.class", "kafka.serializer.DefaultEncoder")

  override def receive: Receive = {
    case msg: Message =>
      val keyedMessage = new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)
      producer.send(keyedMessage)

    case _ => log.error("Got a msg that I don't understand")
  }

将 Array[Byte] 修改为 String 并将 DefaultEncoder 修改为 StringEncoder,它有效。

有什么想法吗?

嗯,我刚发现问题。

实际上,您有两种编码器选项。 DefaultEncoder 和 StringEncoder。我试图使用默认编码器将我的 ID 作为字符串发送,将消息作为字节数组发送。

查看 Encoder.scala,您可以看到 DefaultEncoder 实现和答案。

/**
 * The default implementation is a no-op, it just returns the same array it takes in
 */
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
  override def toBytes(value: Array[Byte]): Array[Byte] = value
}

它只是 returns 字节数组,并且由于我传递的是一个字符串,它抛出了转换异常。