从 akka-stream 到 fs2 的旅程——如何使用 http4s 在 fs2 中定义一个 akka-stream http 流
A journey from akka-stream to fs2 - how to define an akka-stream http flow like stage in fs2 using http4s
我正在加深我对 fs2 的了解,并想尝试 fs2-kafka 作为我将替换 akka 流的用例。这个想法很简单,通过对接收器的 http 请求从 kafka 和 post 数据中读取数据,然后在成功时提交回 kafka。到目前为止,我真的无法弄清楚 http 部分。在 akka stream / akka http 中,你有开箱即用的流程 https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool
Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
与 akka 流完美集成。
我想看看我是否可以用 http4s 和 fs2 做类似的事情。
有没有人有任何参考资料、代码示例、博客以及没有说明如何进行这种集成的内容。到目前为止,我唯一能想到的是,将流包装到客户端资源的使用方法中,即
BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }
即便如此我也不确定整个事情
typelevel 生态系统的特点是一切都只是一个库,您不需要示例说明它们之间有多少相互作用,您只需要了解它们如何相互作用图书馆作品及构图基本规则
def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
// Fill this based on the documentation of the client of your choice:
// I would recommend the ember client from http4s:
// https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder
}
def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
// Fill this based on the documentation of your client:
// https://http4s.org/v0.23/client/
// https://http4s.org/v0.23/api/org/http4s/client/client
}
def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
// Fill this based on the documentation of fs2-kafka:
// https://fd4s.github.io/fs2-kafka/docs/consumers
}
def program(/** whatever arguments you need */): Stream[IO, Unit] = {
// Based on the documentation of fs2 and fs2-kafka I would guess something like this:
Stream.fromResource(createClient(...)).flatMap { client =>
getStreamOfRecords(...).evalMapFilter { committable =>
sendHttpRequest(client)(data = committable.record).map { result =>
if (result.isSuccess) Some(committable.offset)
else None
}
}.through(commitBatchWithin(...))
}
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
program(...).compile.drain
}
请注意,我把所有这些都写在了脑海中,只需快速浏览一下文档,您就需要更改很多东西 (尤其是类型,例如 Data
& Result
)。以及调整错误处理和何时提交回 Kafka.
但是,我希望这可以帮助您了解如何构建代码。
我正在加深我对 fs2 的了解,并想尝试 fs2-kafka 作为我将替换 akka 流的用例。这个想法很简单,通过对接收器的 http 请求从 kafka 和 post 数据中读取数据,然后在成功时提交回 kafka。到目前为止,我真的无法弄清楚 http 部分。在 akka stream / akka http 中,你有开箱即用的流程 https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool
Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
与 akka 流完美集成。
我想看看我是否可以用 http4s 和 fs2 做类似的事情。
有没有人有任何参考资料、代码示例、博客以及没有说明如何进行这种集成的内容。到目前为止,我唯一能想到的是,将流包装到客户端资源的使用方法中,即
BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }
即便如此我也不确定整个事情
typelevel 生态系统的特点是一切都只是一个库,您不需要示例说明它们之间有多少相互作用,您只需要了解它们如何相互作用图书馆作品及构图基本规则
def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
// Fill this based on the documentation of the client of your choice:
// I would recommend the ember client from http4s:
// https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder
}
def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
// Fill this based on the documentation of your client:
// https://http4s.org/v0.23/client/
// https://http4s.org/v0.23/api/org/http4s/client/client
}
def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
// Fill this based on the documentation of fs2-kafka:
// https://fd4s.github.io/fs2-kafka/docs/consumers
}
def program(/** whatever arguments you need */): Stream[IO, Unit] = {
// Based on the documentation of fs2 and fs2-kafka I would guess something like this:
Stream.fromResource(createClient(...)).flatMap { client =>
getStreamOfRecords(...).evalMapFilter { committable =>
sendHttpRequest(client)(data = committable.record).map { result =>
if (result.isSuccess) Some(committable.offset)
else None
}
}.through(commitBatchWithin(...))
}
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
program(...).compile.drain
}
请注意,我把所有这些都写在了脑海中,只需快速浏览一下文档,您就需要更改很多东西 (尤其是类型,例如 Data
& Result
)。以及调整错误处理和何时提交回 Kafka.
但是,我希望这可以帮助您了解如何构建代码。