以非阻塞和懒惰的方式用 Futures 构建一个 LazyList
Building a LazyList with Futures in a non-blocking and lazy way
我正在为 Clojure 通道构建一个 Scala 门面来传递数据块,我想将其表示为 LazyList[Future[Either[String, Int]]]
,其中左侧可以保存错误消息,右侧可以保存数据。从 Channel 中检索每个块是一个阻塞操作,因此我想将每个块封装在一个 Future 中。
每个块结果类型决定了我们应该如何继续构建惰性列表:
- null: 频道上没有更多结果,return list
- 字符串:添加
Left(error)
和return列表
- Int:添加
Right(data)
并递归下一个块
我的问题是我们是否可以以惰性和非阻塞的方式构建这样的列表?
这是我到目前为止的想法,但是头部被评估了(不是懒惰的)并且 Await.result 块:
// Clojure "Channel" dummy
case class Channel(vs: Any*) {
private val it = vs.toIterable.iterator
// equivalent to the `<!!` Clojure function
def chunk: Future[Any] = Future {
// This imitates an expensive blocking operation
if (it.hasNext) {
val value = it.next
println("Retrieving value: " + value)
value
} else {
null
}
}
}
def lazyList(channel: Channel): LazyList[Future[Either[String, Int]]] = {
val ll = channel.chunk.map {
case null => LazyList.empty[Future[Either[String, Int]]] // No more values
case error: String => Future(Left(error)) #:: LazyList.empty[Future[Either[String, Int]]]
case data: Int => Future(Right(data)) #:: lazyList(channel)
}
Await.result(ll, Duration.Inf)
}
val ll = lazyList(Channel(0, 1, "error"))
// Retrieving value: 0
ll(0)
// (no output since value 0 has already been calculated and memoized)
ll(1)
// Retrieving value: 1
ll(2)
// Retrieving value: error
我想看到的是:
val ll2 = lazyList2(Channel(0, 1, "error"))
// (no computation)
ll2(0)
// Retrieving value: 0
ll2(1)
// Retrieving value: 1
ll2(2)
// Retrieving value: error
如果您使用的是 fs2,则可以从该频道构建一个流。给定一个函数
def nextChunk: Future[A] = ???
您可以使用
构建流
val myStream: Stream[IO, A] = Stream.eval(IO.fromFuture(IO(nextChunk))).repeat
在您的具体示例中,您的 A
是 Any
,您知道在运行时是 Int
、String
或 null
。您可以先将其提升为 Option[Either[String, Int]]
和
def typedChunk(channel: Channel): IO[Option[Either[String, Int]]] =
IO.fromFuture(IO(channel.nextChunk)).map {
case null => None
case s: String => Some(Left(s))
case i: Int => Some(Right(i))
}
然后您可以构建流,终止于 None
和
def myTerminatedStream(channel: Channel): Stream[IO, Either[String, Int]] =
Stream.eval(typedChunk(channel)).repeat.unNoneTerminate
这完成了保持引用透明并确保它具有适合您的正确求值语义的所有艰苦工作。
你用 LazyList 请求的语义会很棘手:你只会在 Future 完成评估后才知道你的块是空的,所以你需要评估 Future 才能知道你的列表是否为空。 LazyList 能够做到这一点,但只能通过阻塞操作,而不是通过 Future。
我正在为 Clojure 通道构建一个 Scala 门面来传递数据块,我想将其表示为 LazyList[Future[Either[String, Int]]]
,其中左侧可以保存错误消息,右侧可以保存数据。从 Channel 中检索每个块是一个阻塞操作,因此我想将每个块封装在一个 Future 中。
每个块结果类型决定了我们应该如何继续构建惰性列表:
- null: 频道上没有更多结果,return list
- 字符串:添加
Left(error)
和return列表 - Int:添加
Right(data)
并递归下一个块
我的问题是我们是否可以以惰性和非阻塞的方式构建这样的列表?
这是我到目前为止的想法,但是头部被评估了(不是懒惰的)并且 Await.result 块:
// Clojure "Channel" dummy
case class Channel(vs: Any*) {
private val it = vs.toIterable.iterator
// equivalent to the `<!!` Clojure function
def chunk: Future[Any] = Future {
// This imitates an expensive blocking operation
if (it.hasNext) {
val value = it.next
println("Retrieving value: " + value)
value
} else {
null
}
}
}
def lazyList(channel: Channel): LazyList[Future[Either[String, Int]]] = {
val ll = channel.chunk.map {
case null => LazyList.empty[Future[Either[String, Int]]] // No more values
case error: String => Future(Left(error)) #:: LazyList.empty[Future[Either[String, Int]]]
case data: Int => Future(Right(data)) #:: lazyList(channel)
}
Await.result(ll, Duration.Inf)
}
val ll = lazyList(Channel(0, 1, "error"))
// Retrieving value: 0
ll(0)
// (no output since value 0 has already been calculated and memoized)
ll(1)
// Retrieving value: 1
ll(2)
// Retrieving value: error
我想看到的是:
val ll2 = lazyList2(Channel(0, 1, "error"))
// (no computation)
ll2(0)
// Retrieving value: 0
ll2(1)
// Retrieving value: 1
ll2(2)
// Retrieving value: error
如果您使用的是 fs2,则可以从该频道构建一个流。给定一个函数
def nextChunk: Future[A] = ???
您可以使用
构建流val myStream: Stream[IO, A] = Stream.eval(IO.fromFuture(IO(nextChunk))).repeat
在您的具体示例中,您的 A
是 Any
,您知道在运行时是 Int
、String
或 null
。您可以先将其提升为 Option[Either[String, Int]]
和
def typedChunk(channel: Channel): IO[Option[Either[String, Int]]] =
IO.fromFuture(IO(channel.nextChunk)).map {
case null => None
case s: String => Some(Left(s))
case i: Int => Some(Right(i))
}
然后您可以构建流,终止于 None
和
def myTerminatedStream(channel: Channel): Stream[IO, Either[String, Int]] =
Stream.eval(typedChunk(channel)).repeat.unNoneTerminate
这完成了保持引用透明并确保它具有适合您的正确求值语义的所有艰苦工作。
你用 LazyList 请求的语义会很棘手:你只会在 Future 完成评估后才知道你的块是空的,所以你需要评估 Future 才能知道你的列表是否为空。 LazyList 能够做到这一点,但只能通过阻塞操作,而不是通过 Future。