以非阻塞和懒惰的方式用 Futures 构建一个 LazyList

Building a LazyList with Futures in a non-blocking and lazy way

我正在为 Clojure 通道构建一个 Scala 门面来传递数据块,我想将其表示为 LazyList[Future[Either[String, Int]]],其中左侧可以保存错误消息,右侧可以保存数据。从 Channel 中检索每个块是一个阻塞操作,因此我想将每个块封装在一个 Future 中。

每个块结果类型决定了我们应该如何继续构建惰性列表:

  1. null: 频道上没有更多结果,return list
  2. 字符串:添加Left(error)和return列表
  3. Int:添加Right(data)并递归下一个块

我的问题是我们是否可以以惰性和非阻塞的方式构建这样的列表?

这是我到目前为止的想法,但是头部被评估了(不是懒惰的)并且 Await.result 块:

// Clojure "Channel" dummy
case class Channel(vs: Any*) {
  private val it = vs.toIterable.iterator

  // equivalent to the `<!!` Clojure function
  def chunk: Future[Any] = Future {
    // This imitates an expensive blocking operation
    if (it.hasNext) {
      val value = it.next
      println("Retrieving value: " + value)
      value
    } else {
      null
    }
  }
}

def lazyList(channel: Channel): LazyList[Future[Either[String, Int]]] = {
  val ll = channel.chunk.map {
    case null          => LazyList.empty[Future[Either[String, Int]]] // No more values
    case error: String => Future(Left(error)) #:: LazyList.empty[Future[Either[String, Int]]]
    case data: Int     => Future(Right(data)) #:: lazyList(channel)
  }
  Await.result(ll, Duration.Inf)
}

val ll = lazyList(Channel(0, 1, "error"))
// Retrieving value: 0
ll(0)
// (no output since value 0 has already been calculated and memoized)
ll(1)
// Retrieving value: 1
ll(2)
// Retrieving value: error

我想看到的是:

val ll2 = lazyList2(Channel(0, 1, "error"))
// (no computation)
ll2(0)
// Retrieving value: 0
ll2(1)
// Retrieving value: 1
ll2(2)
// Retrieving value: error

如果您使用的是 fs2,则可以从该频道构建一个流。给定一个函数

def nextChunk: Future[A] = ???

您可以使用

构建流
val myStream: Stream[IO, A] = Stream.eval(IO.fromFuture(IO(nextChunk))).repeat

在您的具体示例中,您的 AAny,您知道在运行时是 IntStringnull。您可以先将其提升为 Option[Either[String, Int]]

def typedChunk(channel: Channel): IO[Option[Either[String, Int]]] = 
  IO.fromFuture(IO(channel.nextChunk)).map {
    case null      => None
    case s: String => Some(Left(s))
    case i: Int    => Some(Right(i))
  }

然后您可以构建流,终止于 None

def myTerminatedStream(channel: Channel): Stream[IO, Either[String, Int]] = 
  Stream.eval(typedChunk(channel)).repeat.unNoneTerminate

这完成了保持引用透明并确保它具有适合您的正确求值语义的所有艰苦工作。

你用 LazyList 请求的语义会很棘手:你只会在 Future 完成评估后才知道你的块是空的,所以你需要评估 Future 才能知道你的列表是否为空。 LazyList 能够做到这一点,但只能通过阻塞操作,而不是通过 Future。