scala + kafka 不发送带有 id 的消息
scala + kafka does not send message with id
我正在尝试通过我的 actor 向 Kafka 发送消息,但它不起作用。
以下代码有效
new KeyedMessage[String, Array[Byte]]("my-topic", msg.message)
这个不……为什么?
new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)
甚至
new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, null, msg.message)
将分区设置为空,强制它只填充id,但还是一样。
有什么想法吗?
编辑
好吧,当以字节数组形式发送消息时才意识到它不起作用。
我已经更改了我的代码以允许以字符串形式发送消息并且它有效。
以下情况不起作用:
props.put("serializer.class", "kafka.serializer.DefaultEncoder")
override def receive: Receive = {
case msg: Message =>
val keyedMessage = new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)
producer.send(keyedMessage)
case _ => log.error("Got a msg that I don't understand")
}
将 Array[Byte] 修改为 String 并将 DefaultEncoder 修改为 StringEncoder,它有效。
有什么想法吗?
嗯,我刚发现问题。
实际上,您有两种编码器选项。 DefaultEncoder 和 StringEncoder。我试图使用默认编码器将我的 ID 作为字符串发送,将消息作为字节数组发送。
查看 Encoder.scala,您可以看到 DefaultEncoder 实现和答案。
/**
* The default implementation is a no-op, it just returns the same array it takes in
*/
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
override def toBytes(value: Array[Byte]): Array[Byte] = value
}
它只是 returns 字节数组,并且由于我传递的是一个字符串,它抛出了转换异常。
我正在尝试通过我的 actor 向 Kafka 发送消息,但它不起作用。
以下代码有效
new KeyedMessage[String, Array[Byte]]("my-topic", msg.message)
这个不……为什么?
new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)
甚至
new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, null, msg.message)
将分区设置为空,强制它只填充id,但还是一样。
有什么想法吗?
编辑
好吧,当以字节数组形式发送消息时才意识到它不起作用。
我已经更改了我的代码以允许以字符串形式发送消息并且它有效。
以下情况不起作用:
props.put("serializer.class", "kafka.serializer.DefaultEncoder")
override def receive: Receive = {
case msg: Message =>
val keyedMessage = new KeyedMessage[String, Array[Byte]]("my-topic", msg.id, msg.message)
producer.send(keyedMessage)
case _ => log.error("Got a msg that I don't understand")
}
将 Array[Byte] 修改为 String 并将 DefaultEncoder 修改为 StringEncoder,它有效。
有什么想法吗?
嗯,我刚发现问题。
实际上,您有两种编码器选项。 DefaultEncoder 和 StringEncoder。我试图使用默认编码器将我的 ID 作为字符串发送,将消息作为字节数组发送。
查看 Encoder.scala,您可以看到 DefaultEncoder 实现和答案。
/**
* The default implementation is a no-op, it just returns the same array it takes in
*/
class DefaultEncoder(props: VerifiableProperties = null) extends Encoder[Array[Byte]] {
override def toBytes(value: Array[Byte]): Array[Byte] = value
}
它只是 returns 字节数组,并且由于我传递的是一个字符串,它抛出了转换异常。