从 akka-stream 到 fs2 的旅程——如何使用 http4s 在 fs2 中定义一个 akka-stream http 流

A journey from akka-stream to fs2 - how to define an akka-stream http flow like stage in fs2 using http4s

我正在加深我对 fs2 的了解,并想尝试 fs2-kafka 作为我将替换 akka 流的用例。这个想法很简单,通过对接收器的 http 请求从 kafka 和 post 数据中读取数据,然后在成功时提交回 kafka。到目前为止,我真的无法弄清楚 http 部分。在 akka stream / akka http 中,你有开箱即用的流程 https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool

Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]

与 akka 流完美集成。

我想看看我是否可以用 http4s 和 fs2 做类似的事情。

有没有人有任何参考资料、代码示例、博客以及没有说明如何进行这种集成的内容。到目前为止,我唯一能想到的是,将流包装到客户端资源的使用方法中,即

BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }

即便如此我也不确定整个事情

typelevel 生态系统的特点是一切都只是一个库,您不需要示例说明它们之间有多少相互作用,您只需要了解它们如何相互作用图书馆作品及构图基本规则

def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
  // Fill this based on the documentation of the client of your choice:
  // I would recommend the ember client from http4s:
  // https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder 
}


def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
  // Fill this based on the documentation of your client:
  // https://http4s.org/v0.23/client/
  // https://http4s.org/v0.23/api/org/http4s/client/client
}

def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
  // Fill this based on the documentation of fs2-kafka:
  // https://fd4s.github.io/fs2-kafka/docs/consumers
}

def program(/** whatever arguments you need */): Stream[IO, Unit] = {
  // Based on the documentation of fs2 and fs2-kafka I would guess something like this:
  Stream.fromResource(createClient(...)).flatMap { client =>
    getStreamOfRecords(...).evalMapFilter { committable =>
      sendHttpRequest(client)(data = committable.record).map { result =>
        if (result.isSuccess) Some(committable.offset)
        else None
      }
    }.through(commitBatchWithin(...))
  }
}

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    program(...).compile.drain
}

请注意,我把所有这些都写在了脑海中,只需快速浏览一下文档,您就需要更改很多东西 (尤其是类型,例如 Data & Result)。以及调整错误处理和何时提交回 Kafka.
但是,我希望这可以帮助您了解如何构建代码。