Akka Stream Kafka,到达日志末尾时完成流
Akka Stream Kafka, complete stream when reached end of log
我正在使用 Akka Streams Kafka,我正在寻找一种方法来执行以下操作:
- 来自偏移量
x
的启动流
- 按顺序消费项目
x
、x+1
、x+2
.. 直到最后一个项目
- 消耗完最后一个项目后,完成流。
代码看起来像
Consumer
.plainSource(consumerSettings, subscription)
.runForeach(println("got record!"))
.onComplete {
case Success(_) => // all items read
case Failure(error) => // error
}
它会在读取最后一个元素后完成。也许这不是这个库的使用方式。我怎样才能做到这一点?
Akka Consumer 以 "pulling" 方式工作,它将永远存在,除非发生与代理的连接错误。但是,你什么时候认为流结束了? Kafka 可以被视为分布式日志,您可以从中读取给定偏移量的消息。只要您的客户端连接到 Broker,您的客户端就会启动并且 运行... 如果您考虑在某个时间间隔内没有来自 Kafka 的事件(例如)时终止您的流,您可以使用 空闲超时:
Consumer
.plainSource(consumerSettings, subscription)
.idleTimeout(10 seconds)
.runForeach(e => println("E"))
.onComplete {
case Success(_) => // all items read
case Failure(error) =>
// TimeoutException if no element in ten seconds the stream stops throwing this exception
}
另一种可能性是使用 Fan-In 阶段,特别是 MergePreferred。我们可以创建另一个在时间间隔内发出事件的 Tick Source。 Kafka 源将有优先权,因此只要元素来自 Kafka,舞台总是会从该源中提取元素。如果某个区间内没有元素,则 "Timeout" 字符串将被推向下游。类似于:
implicit val actorSystem = ActorSystem("test-actor-system")
implicit val streamMaterializer = ActorMaterializer()
implicit val ec = actorSystem.dispatcher
val consumer =
Consumer
.plainSource(consumerSettings, subscription)
.map(_.value())
val tick = Source.tick(50 millis, 30 seconds, "Timeout")
val source = GraphDSL.create(consumer, tick)(Keep.both) { implicit b ⇒
(r1, r2) ⇒
val merge = b.add(MergePreferred[String](1, false))
r2 ~> merge.in(0)
r1 ~> merge.preferred
SourceShape(merge.out)
}
Source
.fromGraph(source)
.takeWhile(el => el != "Timeout")
.runForeach(msg => println(msg))
.onComplete{
case Success(_) => println("Stream ended")
case Failure(error) => println("There was an error")
}
使用 takeWhile 流将处于活动状态,同时有来自 Kafka 的元素。
这只是一种方法。 Akka Stream 有许多不同的阶段,Graph Api 可能以更优雅的方式面对这些情况。
我正在使用 Akka Streams Kafka,我正在寻找一种方法来执行以下操作:
- 来自偏移量
x
的启动流
- 按顺序消费项目
x
、x+1
、x+2
.. 直到最后一个项目 - 消耗完最后一个项目后,完成流。
代码看起来像
Consumer
.plainSource(consumerSettings, subscription)
.runForeach(println("got record!"))
.onComplete {
case Success(_) => // all items read
case Failure(error) => // error
}
它会在读取最后一个元素后完成。也许这不是这个库的使用方式。我怎样才能做到这一点?
Akka Consumer 以 "pulling" 方式工作,它将永远存在,除非发生与代理的连接错误。但是,你什么时候认为流结束了? Kafka 可以被视为分布式日志,您可以从中读取给定偏移量的消息。只要您的客户端连接到 Broker,您的客户端就会启动并且 运行... 如果您考虑在某个时间间隔内没有来自 Kafka 的事件(例如)时终止您的流,您可以使用 空闲超时:
Consumer
.plainSource(consumerSettings, subscription)
.idleTimeout(10 seconds)
.runForeach(e => println("E"))
.onComplete {
case Success(_) => // all items read
case Failure(error) =>
// TimeoutException if no element in ten seconds the stream stops throwing this exception
}
另一种可能性是使用 Fan-In 阶段,特别是 MergePreferred。我们可以创建另一个在时间间隔内发出事件的 Tick Source。 Kafka 源将有优先权,因此只要元素来自 Kafka,舞台总是会从该源中提取元素。如果某个区间内没有元素,则 "Timeout" 字符串将被推向下游。类似于:
implicit val actorSystem = ActorSystem("test-actor-system")
implicit val streamMaterializer = ActorMaterializer()
implicit val ec = actorSystem.dispatcher
val consumer =
Consumer
.plainSource(consumerSettings, subscription)
.map(_.value())
val tick = Source.tick(50 millis, 30 seconds, "Timeout")
val source = GraphDSL.create(consumer, tick)(Keep.both) { implicit b ⇒
(r1, r2) ⇒
val merge = b.add(MergePreferred[String](1, false))
r2 ~> merge.in(0)
r1 ~> merge.preferred
SourceShape(merge.out)
}
Source
.fromGraph(source)
.takeWhile(el => el != "Timeout")
.runForeach(msg => println(msg))
.onComplete{
case Success(_) => println("Stream ended")
case Failure(error) => println("There was an error")
}
使用 takeWhile 流将处于活动状态,同时有来自 Kafka 的元素。
这只是一种方法。 Akka Stream 有许多不同的阶段,Graph Api 可能以更优雅的方式面对这些情况。