远程数据的异步迭代
Asynchronous Iterable over remote data
我从远程 API 中提取了一些数据,为此我使用了 Future 风格的界面。数据结构为链表。相关示例数据容器如下所示。
case class Data(information: Int) {
def hasNext: Boolean = ??? // Implemented
def next: Future[Data] = ??? // Implemented
}
现在我有兴趣向数据 class 添加一些功能,例如 map
、foreach
、reduce
等。为此我想实现某种形式的 IterableLike
以便继承这些方法。
下面给出的是特征 Data
可以扩展,这样它就得到了这个 属性.
trait AsyncIterable[+T]
extends IterableLike[Future[T], AsyncIterable[T]]
{
def hasNext : Boolean
def next : Future[T]
// How to implement?
override def iterator: Iterator[Future[T]] = ???
override protected[this] def newBuilder: mutable.Builder[Future[T], AsyncIterable[T]] = ???
override def seq: TraversableOnce[Future[T]] = ???
}
它应该是一个非阻塞的实现,当它被执行时,开始从远程数据源请求下一个数据。
然后就可以做一些很酷的事情,比如
case class Data(information: Int) extends AsyncIterable[Data]
val data = Data(1) // And more, of course
// Asynchronously print all the information.
data.foreach(data => println(data.information))
界面不同也是可以接受的。但结果应该以某种方式表示对集合的异步迭代。最好以开发人员熟悉的方式进行,因为它将成为(开源)库的一部分。
在生产中我会使用以下之一:
对于私人测试,我将实施类似于以下的内容。
(解释如下)
我稍微修改了一下你的Data
:
abstract class AsyncIterator[T] extends Iterator[Future[T]] {
def hasNext: Boolean
def next(): Future[T]
}
为此我们可以实现这个Iterable
:
class AsyncIterable[T](sourceIterator: AsyncIterator[T])
extends IterableLike[Future[T], AsyncIterable[T]]
{
private def stream(): Stream[Future[T]] =
if(sourceIterator.hasNext) {sourceIterator.next #:: stream()} else {Stream.empty}
val asStream = stream()
override def iterator = asStream.iterator
override def seq = asStream.seq
override protected[this] def newBuilder = throw new UnsupportedOperationException()
}
如果使用以下代码看到它的实际效果:
object Example extends App {
val source = "Hello World!";
val iterator1 = new DelayedIterator[Char](100L, source.toCharArray)
new AsyncIterable(iterator1).foreach(_.foreach(print)) //prints 1 char per 100 ms
pause(2000L)
val iterator2 = new DelayedIterator[String](100L, source.toCharArray.map(_.toString))
new AsyncIterable(iterator2).reduceLeft((fl: Future[String], fr) =>
for(l <- fl; r <- fr) yield {println(s"$l+$r"); l + r}) //prints 1 line per 100 ms
pause(2000L)
def pause(duration: Long) = {println("->"); Thread.sleep(duration); println("\n<-")}
}
class DelayedIterator[T](delay: Long, data: Seq[T]) extends AsyncIterator[T] {
private val dataIterator = data.iterator
private var nextTime = System.currentTimeMillis() + delay
override def hasNext = dataIterator.hasNext
override def next = {
val thisTime = math.max(System.currentTimeMillis(), nextTime)
val thisValue = dataIterator.next()
nextTime = thisTime + delay
Future {
val now = System.currentTimeMillis()
if(thisTime > now) Thread.sleep(thisTime - now) //Your implementation will be better
thisValue
}
}
}
说明
AsyncIterable
使用 Stream 因为它是延迟计算的而且很简单。
优点:
- 简单
- 多次调用
iterator
和 seq
方法 return 与所有项目相同的可迭代对象。
缺点:
- 可能会导致内存溢出,因为流会保留所有先前获得的值。
- 创建
AsyncIterable
期间急切获取第一个值
DelayedIterator
是 AsyncIterator 的非常简单的实现,不要怪我这里的代码又快又脏。
看到同步hasNext
和异步next()
我还是很奇怪
我使用 Twitter Spool 实现了一个工作示例。
为了实施 spool
我修改了 documentation.
中的示例
import com.twitter.concurrent.Spool
import com.twitter.util.{Await, Return, Promise}
import scala.concurrent.{ExecutionContext, Future}
trait AsyncIterable[+T <: AsyncIterable[T]] { self : T =>
def hasNext : Boolean
def next : Future[T]
def spool(implicit ec: ExecutionContext) : Spool[T] = {
def fill(currentPage: Future[T], rest: Promise[Spool[T]]) {
currentPage foreach { cPage =>
if(hasNext) {
val nextSpool = new Promise[Spool[T]]
rest() = Return(cPage *:: nextSpool)
fill(next, nextSpool)
} else {
val emptySpool = new Promise[Spool[T]]
emptySpool() = Return(Spool.empty[T])
rest() = Return(cPage *:: emptySpool)
}
}
}
val rest = new Promise[Spool[T]]
if(hasNext) {
fill(next, rest)
} else {
rest() = Return(Spool.empty[T])
}
self *:: rest
}
}
数据和以前一样,现在我们可以使用了。
// Cool stuff
implicit val ec = scala.concurrent.ExecutionContext.global
val data = Data(1) // And others
// Print all the information asynchronously
val fut = data.spool.foreach(data => println(data.information))
Await.ready(fut)
它会在第二个元素上抛出异常,因为没有提供 next
的实现。
我从远程 API 中提取了一些数据,为此我使用了 Future 风格的界面。数据结构为链表。相关示例数据容器如下所示。
case class Data(information: Int) {
def hasNext: Boolean = ??? // Implemented
def next: Future[Data] = ??? // Implemented
}
现在我有兴趣向数据 class 添加一些功能,例如 map
、foreach
、reduce
等。为此我想实现某种形式的 IterableLike
以便继承这些方法。
下面给出的是特征 Data
可以扩展,这样它就得到了这个 属性.
trait AsyncIterable[+T]
extends IterableLike[Future[T], AsyncIterable[T]]
{
def hasNext : Boolean
def next : Future[T]
// How to implement?
override def iterator: Iterator[Future[T]] = ???
override protected[this] def newBuilder: mutable.Builder[Future[T], AsyncIterable[T]] = ???
override def seq: TraversableOnce[Future[T]] = ???
}
它应该是一个非阻塞的实现,当它被执行时,开始从远程数据源请求下一个数据。 然后就可以做一些很酷的事情,比如
case class Data(information: Int) extends AsyncIterable[Data]
val data = Data(1) // And more, of course
// Asynchronously print all the information.
data.foreach(data => println(data.information))
界面不同也是可以接受的。但结果应该以某种方式表示对集合的异步迭代。最好以开发人员熟悉的方式进行,因为它将成为(开源)库的一部分。
在生产中我会使用以下之一:
对于私人测试,我将实施类似于以下的内容。 (解释如下)
我稍微修改了一下你的Data
:
abstract class AsyncIterator[T] extends Iterator[Future[T]] {
def hasNext: Boolean
def next(): Future[T]
}
为此我们可以实现这个Iterable
:
class AsyncIterable[T](sourceIterator: AsyncIterator[T])
extends IterableLike[Future[T], AsyncIterable[T]]
{
private def stream(): Stream[Future[T]] =
if(sourceIterator.hasNext) {sourceIterator.next #:: stream()} else {Stream.empty}
val asStream = stream()
override def iterator = asStream.iterator
override def seq = asStream.seq
override protected[this] def newBuilder = throw new UnsupportedOperationException()
}
如果使用以下代码看到它的实际效果:
object Example extends App {
val source = "Hello World!";
val iterator1 = new DelayedIterator[Char](100L, source.toCharArray)
new AsyncIterable(iterator1).foreach(_.foreach(print)) //prints 1 char per 100 ms
pause(2000L)
val iterator2 = new DelayedIterator[String](100L, source.toCharArray.map(_.toString))
new AsyncIterable(iterator2).reduceLeft((fl: Future[String], fr) =>
for(l <- fl; r <- fr) yield {println(s"$l+$r"); l + r}) //prints 1 line per 100 ms
pause(2000L)
def pause(duration: Long) = {println("->"); Thread.sleep(duration); println("\n<-")}
}
class DelayedIterator[T](delay: Long, data: Seq[T]) extends AsyncIterator[T] {
private val dataIterator = data.iterator
private var nextTime = System.currentTimeMillis() + delay
override def hasNext = dataIterator.hasNext
override def next = {
val thisTime = math.max(System.currentTimeMillis(), nextTime)
val thisValue = dataIterator.next()
nextTime = thisTime + delay
Future {
val now = System.currentTimeMillis()
if(thisTime > now) Thread.sleep(thisTime - now) //Your implementation will be better
thisValue
}
}
}
说明
AsyncIterable
使用 Stream 因为它是延迟计算的而且很简单。
优点:
- 简单
- 多次调用
iterator
和seq
方法 return 与所有项目相同的可迭代对象。
缺点:
- 可能会导致内存溢出,因为流会保留所有先前获得的值。
- 创建
AsyncIterable
期间急切获取第一个值
DelayedIterator
是 AsyncIterator 的非常简单的实现,不要怪我这里的代码又快又脏。
看到同步hasNext
和异步next()
我使用 Twitter Spool 实现了一个工作示例。
为了实施 spool
我修改了 documentation.
import com.twitter.concurrent.Spool
import com.twitter.util.{Await, Return, Promise}
import scala.concurrent.{ExecutionContext, Future}
trait AsyncIterable[+T <: AsyncIterable[T]] { self : T =>
def hasNext : Boolean
def next : Future[T]
def spool(implicit ec: ExecutionContext) : Spool[T] = {
def fill(currentPage: Future[T], rest: Promise[Spool[T]]) {
currentPage foreach { cPage =>
if(hasNext) {
val nextSpool = new Promise[Spool[T]]
rest() = Return(cPage *:: nextSpool)
fill(next, nextSpool)
} else {
val emptySpool = new Promise[Spool[T]]
emptySpool() = Return(Spool.empty[T])
rest() = Return(cPage *:: emptySpool)
}
}
}
val rest = new Promise[Spool[T]]
if(hasNext) {
fill(next, rest)
} else {
rest() = Return(Spool.empty[T])
}
self *:: rest
}
}
数据和以前一样,现在我们可以使用了。
// Cool stuff
implicit val ec = scala.concurrent.ExecutionContext.global
val data = Data(1) // And others
// Print all the information asynchronously
val fut = data.spool.foreach(data => println(data.information))
Await.ready(fut)
它会在第二个元素上抛出异常,因为没有提供 next
的实现。