与 websocket 连接时使用 akka-stream-kafka 从 kafka Topic 获取最后一条消息
Getting last message from kafka Topic using akka-stream-kafka when connecting with websocket
是否可以使用 Akka Streams Kafka 获取关于 Kafka 主题的最后一条消息?我正在创建一个侦听 Kafka 主题的 websocket,但目前它会在我连接时检索所有先前未显示的消息。这可以加起来相当多的消息,所以我只对最后一条消息 + 任何未来的消息感兴趣。 (或仅未来的消息)
来源:
def source(): Flow[Any, String, NotUsed] = {
val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}
消费者设置:
@Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
val deserializer = new StringDeserializer()
val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
.getOrElse(Configuration.empty)
ConsumerSettings(config.underlying, deserializer, deserializer)
.withBootstrapServers(kafkaUrl)
.withGroupId(GroupId)
}
我尝试添加设置 ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
应该"automatically reset the offset to the latest offset",但是好像没有什么作用
我能够使用 David van Geest 非常巧妙地描述的方法避免在客户端连接时获取任何上游数据 here
它归结为在消费者上有一个 BroadcastHub:
val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()
并连接一个静态消费者来吃掉所有的上游数据
liveSource.to(Sink.ignore).run()
之后这让我有一个 WebSocket 客户端订阅消费者收到的所有数据:
def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}
或基于 KafkaTopic(或任何其他你想要的)进行过滤
def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
x =>
(Json.parse(x) \ "topic").asOpt[String] match {
case Some(str) => str.equals(kafkaTopic)
case None => false
}
}))
}
这并没有解决第一次连接时给用户 x 量数据的问题,但我预见到我们会为任何历史数据添加一个简单的数据库查询,并让 WebSocket 连接只关注直播数据。
是否可以使用 Akka Streams Kafka 获取关于 Kafka 主题的最后一条消息?我正在创建一个侦听 Kafka 主题的 websocket,但目前它会在我连接时检索所有先前未显示的消息。这可以加起来相当多的消息,所以我只对最后一条消息 + 任何未来的消息感兴趣。 (或仅未来的消息)
来源:
def source(): Flow[Any, String, NotUsed] = {
val source = Consumer.plainSource(consumerSettings, Subscriptions.topics(MyTopic))
Flow.fromSinkAndSource[Any, String](Sink.ignore, source.map(_.value)
}
消费者设置:
@Provides
def providesConsumerSettings(@Named("kafkaUrl") kafkaUrl: String): ConsumerSettings[String, String] = {
val deserializer = new StringDeserializer()
val config = configuration.getOptional[Configuration]("akka.kafka.consumer")
.getOrElse(Configuration.empty)
ConsumerSettings(config.underlying, deserializer, deserializer)
.withBootstrapServers(kafkaUrl)
.withGroupId(GroupId)
}
我尝试添加设置 ConsumerSettings.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
应该"automatically reset the offset to the latest offset",但是好像没有什么作用
我能够使用 David van Geest 非常巧妙地描述的方法避免在客户端连接时获取任何上游数据 here
它归结为在消费者上有一个 BroadcastHub:
val liveSource = Consumer.plainSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(kafkaObject => utils.WebSockets.kafkaWrapper(kafkaObject.topic(), kafkaObject.value()))
.toMat(BroadcastHub.sink)(Keep.right)
.run()
并连接一个静态消费者来吃掉所有的上游数据
liveSource.to(Sink.ignore).run()
之后这让我有一个 WebSocket 客户端订阅消费者收到的所有数据:
def source(): Flow[Any, String, NotUsed] = {Flow.fromSinkAndSource(Sink.ignore, liveSource)}
或基于 KafkaTopic(或任何其他你想要的)进行过滤
def KafkaSpecificSource(kafkaTopic: String): Flow[Any, String, NotUsed] = {
Flow.fromSinkAndSource(Sink.ignore, liveSource.filter({
x =>
(Json.parse(x) \ "topic").asOpt[String] match {
case Some(str) => str.equals(kafkaTopic)
case None => false
}
}))
}
这并没有解决第一次连接时给用户 x 量数据的问题,但我预见到我们会为任何历史数据添加一个简单的数据库查询,并让 WebSocket 连接只关注直播数据。