使用Kafka的一对一即时消息
One to One instant messaging using Kafka
我正在使用 Scala 和 Kafka 创建基于主题的发布-订阅架构。
我的问题是如何使用 Kafka 主题处理应用程序的一对一消息传递部分?
这是我的制作人 class:
class Producer(topic: String, key: String, brokers: String, message: String) {
val producer = new KafkaProducer[String, String](configuration)
private def configuration: Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props
}
def sendMessages(): Unit = {
val record = new ProducerRecord[String, String](topic, key, message)
producer.send(record)
producer.close()
}
}
这是我的消费者class:
class Consumer(brokers: String, topic: String, groupId: String) {
val consumer = new KafkaConsumer[String, String](configuration)
consumer.subscribe(util.Arrays.asList(topic))
private def configuration: Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
props
}
def receiveMessages(): Unit = {
while (true) {
consumer.poll(Duration.ofSeconds(0)).forEach(record => println(s"Received message: $record"))
}
}
}
我还有一个身份验证服务,负责处理与通过 JWT 令牌进行身份验证相关的所有事情。
我对如何向特定用户创建消息感到困惑,我考虑过创建一个“消息”class,但是当涉及到如何发送这些“特定”用户消息以及如何在 kafka 上划分这些消息时,我迷路了供以后使用:
class Message {
def sendMessage(sender_id: String, receiver_id: String, content: String): Unit ={
val newMessage = new Producer(brokers = KAFKA_BROKER,key =sender_id + " to " + receiver_id, topic = "topic_1", message = content)
newMessage.sendMessages()
}
def loadMessage(): Unit ={
//
}
}
我的想法是为属于同一个对话的所有消息指定一个自定义键,但我找不到正确的方法来检索这些消息,因为我的消费者 returns 该主题中包含的所有内容否不管关键是什么。意思是,所有用户最终都会收到所有消息。我知道我的建模看起来很乱,但我找不到正确的方法,当涉及到消费者中 group_id 的使用时,我也有点困惑。
有人可以告诉我什么是实现我在这里想要做的事情的正确方法吗?
couldn't find the right way to retrieve these messages later on ... consumer returns everything contained in that topic no matter what the key is
您需要将 Consumer 实例 .assign
到特定分区,而不是使用 .subscribe
,它读取 所有分区 。或者你会为每个对话使用特定的主题。
但是你需要为每个将要存在的对话提供唯一性 partitions/topics。在用户 create/remove 随机房间的常规聊天应用程序中,Kafka 无法扩展。
最终,我建议将数据写入 else 而不是 Kafka,这样您实际上可以在“convertsationId”and/or 用户 ID 上查询和索引,而不是尝试将这些事件直接从 Kafka 转发到您的“聊天”应用程序中。
我正在使用 Scala 和 Kafka 创建基于主题的发布-订阅架构。 我的问题是如何使用 Kafka 主题处理应用程序的一对一消息传递部分? 这是我的制作人 class:
class Producer(topic: String, key: String, brokers: String, message: String) {
val producer = new KafkaProducer[String, String](configuration)
private def configuration: Properties = {
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.ACKS_CONFIG, "all")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getCanonicalName)
props
}
def sendMessages(): Unit = {
val record = new ProducerRecord[String, String](topic, key, message)
producer.send(record)
producer.close()
}
}
这是我的消费者class:
class Consumer(brokers: String, topic: String, groupId: String) {
val consumer = new KafkaConsumer[String, String](configuration)
consumer.subscribe(util.Arrays.asList(topic))
private def configuration: Properties = {
val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer].getCanonicalName)
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
//props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true)
props
}
def receiveMessages(): Unit = {
while (true) {
consumer.poll(Duration.ofSeconds(0)).forEach(record => println(s"Received message: $record"))
}
}
}
我还有一个身份验证服务,负责处理与通过 JWT 令牌进行身份验证相关的所有事情。 我对如何向特定用户创建消息感到困惑,我考虑过创建一个“消息”class,但是当涉及到如何发送这些“特定”用户消息以及如何在 kafka 上划分这些消息时,我迷路了供以后使用:
class Message {
def sendMessage(sender_id: String, receiver_id: String, content: String): Unit ={
val newMessage = new Producer(brokers = KAFKA_BROKER,key =sender_id + " to " + receiver_id, topic = "topic_1", message = content)
newMessage.sendMessages()
}
def loadMessage(): Unit ={
//
}
}
我的想法是为属于同一个对话的所有消息指定一个自定义键,但我找不到正确的方法来检索这些消息,因为我的消费者 returns 该主题中包含的所有内容否不管关键是什么。意思是,所有用户最终都会收到所有消息。我知道我的建模看起来很乱,但我找不到正确的方法,当涉及到消费者中 group_id 的使用时,我也有点困惑。 有人可以告诉我什么是实现我在这里想要做的事情的正确方法吗?
couldn't find the right way to retrieve these messages later on ... consumer returns everything contained in that topic no matter what the key is
您需要将 Consumer 实例 .assign
到特定分区,而不是使用 .subscribe
,它读取 所有分区 。或者你会为每个对话使用特定的主题。
但是你需要为每个将要存在的对话提供唯一性 partitions/topics。在用户 create/remove 随机房间的常规聊天应用程序中,Kafka 无法扩展。
最终,我建议将数据写入 else 而不是 Kafka,这样您实际上可以在“convertsationId”and/or 用户 ID 上查询和索引,而不是尝试将这些事件直接从 Kafka 转发到您的“聊天”应用程序中。