在没有 var/mutable 集合的情况下在 Scala 中定期获取数据的方法

Way to fetch data periodically in Scala without var/mutable collection

我想定期从某个来源获取数据,每小时一次。我是这样做的,因为取数据很费时间,大约10分钟。所以,我缓存了这个数据。

我现在有这样的代码:

import java.util._

object Loader {
    @volatile private var map: Map[SomeKey, SomeValue] = Map()

    def start() {
        val timer = new Timer()
        val timerTask = new TimerTask {
            override def run() {
                reload()
            }
        }
        val oneHour = 60 * 60 * 1000
        timer.schedule(timerTask, oneHour)
    }

    def reload() {
        map = loadMap()
    }

    // this method invocation costs a lot, so, I cache it in reload()
    def loadMap(): Map[SomeKey, SomeValue] = ...

    def getValue(key: SomeKey): Option[SomeValue] = map.get(key)
}

此外,我的 main() 函数中有 Loader.start() 调用。

这很好用,但我想知道,是否有一些方法可以以更实用的方式编写它:摆脱 var 而不仅仅是使用可变集合?

组合 IO 库 scalaz-stream 成功地在此类用例中使代码远离可变性。一、依赖项:

libraryDependencies ++= Seq(
  "org.scalaz" %% "scalaz-core" % "7.2.8",
  "org.scalaz" %% "scalaz-concurrent" % "7.2.8",
  "org.scalaz.stream" %% "scalaz-stream" % "0.8.6a"
)

我们从 scalaz-concurrentscalaz-stream 的一些导入开始:

import java.util.concurrent.ScheduledExecutorService
import scala.concurrent.duration._
import scalaz.concurrent.Task
import scalaz.stream.time._
import scalaz.stream._
import scalaz.stream.Process.Env
import scalaz.stream.ReceiveY._

假设我们有一个能够提取其快照的命令式数据源。为了演示,它还会在每次提取时更新自身:

trait DataSource[Key, Value] {

  def loadMap: Map[Key, Value]
}

object DataSourceStub extends DataSource[Int, String] {

  private var externalSource: Map[Int, String]  = Map(1 -> "a")

  def loadMap: Map[Int, String] = {
    val snapshot = externalSource
    val key = snapshot.keys.max
    val value = snapshot(key)
    val (newKey, newValue) = (key + 1) -> (value + "a")
    val newSource = snapshot + (newKey -> newValue)
    externalSource = newSource
    snapshot
  }
}

现在,我们通过引入 timer 来启动 Loader 实施,它在启动时立即发出一个单元事件,然后每 refreshEvery 秒发出一次。然后,可以通过将数据接收 Task 映射到每个事件并在流中评估它们来获得报告我们 cacheStates 的事件流。困难的部分是交换:我们需要将 Request 的流(使用缓存中的数据执行某些操作的函数)与我们的定期快照流交织在一起。 scalaz-stream 提供了一个流交换工具,Wye,它允许我们声明我们将以何种顺序处理来自输入流的事件。我们需要一个初始缓存快照才能使用,因此我们从 wye.receiveL 开始,移动到具有初始缓存状态的 handleImpl。现在,我们可以通过 receiveBoth 接收任何事件:

  • 如果是缓存快照更新,我们会在不产生输出的情况下重复它;
  • 如果它是一个请求,我们给它当前的缓存状态并将结果 Task 发送到输出中,也在当前状态上重复出现;
  • 如果其中一个输入流终止,我们将停止处理。

唯一剩下的事情就是将我们的输入与 handle Wye 结合起来,并将处理任务的副作用包含到流中,我们在 processRequests.

中这样做
class Loader[Key, Value](dataSource: DataSource[Key, Value], refreshEvery: Duration) {

  type CacheState = Map[Key, Value]
  type Request = CacheState => Task[Unit]
  type ReaderEnv = Env[CacheState, Request]

  implicit val scheduler: ScheduledExecutorService = DefaultScheduler

  private val timer: Process[Task, Unit] =
    Process.emit(()) ++ awakeEvery(refreshEvery).map(_ => ())

  private val cacheStates: Process[Task, CacheState] =
    timer.evalMap(_ => Task(dataSource.loadMap))

  private val handle: Wye[CacheState, Request, Task[Unit]] = {
    def handleImpl(current: CacheState): Wye[CacheState, Request, Task[Unit]] = {
      import wye._
      import Process._
      receiveBoth {
        case ReceiveL(i) => handleImpl(i)
        case ReceiveR(i) => emit(i(current)) ++ handleImpl(current)
        case HaltOne(rsn) => Halt(rsn)
      }
    }
    wye.receiveL[CacheState, Request, Task[Unit]](handleImpl)
  }

  def processRequests(requests: Process[Task, Request]): Process[Task, Unit] =
    cacheStates.wye(requests)(handle).eval
}

让我们通过每秒执行刷新的同时按最大 ID(每个 100 毫秒)向其发出 100 个数据请求来测试我们的数据加载器:

object TestStreamBatching {

  private val loader = new Loader(DataSourceStub, 1.second)

  private def request(cache: loader.CacheState): Task[Unit] = Task {
    Thread.sleep(100)
    val key = cache.keys.max
    val value = cache(key)
    println(value)
  }

  private val requests: Process[Task, loader.Request] =
    Process.unfold(100)(s => if(s > 0) Some((request, s - 1)) else None)

  def main(args: Array[String]): Unit = {
    loader.processRequests(requests).run.unsafePerformSync
  }
}

通过 运行 它,你可以看到一个 'a' 字母的阶梯,它每秒增加其 riser 大小,最终在 100 个输出后终止。