将 ZIO ZStreams 列表合并为一个

Combine list of ZIO ZStreams into one

我的旧代码支持使用 SqsStream 使用一个 SQS 队列。我必须更新它以支持给定队列 URL 列表的多个队列。

方法内容:

for {
  sqs <- Sqs.>.async // async client
  urls <- Sqs.>.queueUrls // List[String] of multiple queues
  _ <- {
    urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          .mapMParUnordered(10)(handleMessage) // run "handleMessage" up to 10 times concurrently, ZStream[Env, Throwable, Unit]
          .runDrain // ZIO[Env, Throwable, Unit]
          .forever // ZIO[Env, Throwable, Nothing]
      })
} yield ()

但是编译器会抱怨,因为它需要一个 (ZIO, ZIO, ZIO) 而我给了它一个 (ZIO, ZIO, List)。我假设我必须将该列表中的所有效果减少为单个效果,该效果将在所有队列中并行执行 handleMessage,但我不确定语法,因为我没有使用 ZIO 的经验。

基本上到此为止,

urls
      .map(url => {
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))

我的 url 变成了 ZStream。我想我需要使用这个元素和下一个元素调用 ZStream.flatMapPar,依此类推,直到所有元素都被压平在一起。我该怎么做?

runDrain 将 return 一个 ZIO,你可以用 foreachPar_.

触发并忘记它
for {
  sqs <- Sqs.>.async
  urls <- Sqs.>.queueUrls
  // Returns ZIO[R, E, Unit] and executes each effect in parallel while discarding the results
  _ <- ZIO.foreachPar_(urls) { url =>
        SqsStream(sqs, url, SqsStreamSettings(autoDelete = false))
          // Handles up to 10 messages at a time in parallel.
          .mapMParUnordered(10)(handleMessage)
          // The stream is already unbounded so no need to have `.forever`
          .runDrain
      }
} yield ()

我还要澄清 SqsStream 应该已经是无界的,所以你不应该需要使用 forever,并且 mapMParUnordered 参数指的是最大并发不是处理的事件总数。