具有状态的功能性侦听器

Functional listener with state

假设在我的纯 Scala 程序中我依赖于 Java 服务。 此 Java 服务接受一个侦听器,以便在某些数据更改时通知我。

假设数据是元组 (x, y) 并且 java 服务会在 X 或 Y 发生变化时调用侦听器,但我只对 X 感兴趣。

为此,我的监听器必须保存 X 的最后一个值,并且仅在 oldX != X 时转发 update/call,因此为了实现这一点,我不纯的 scala 监听器实现必须保存一个 var oldX

val listener = new JavaServiceListener() {

 var oldX;
 def updated(val x, val y): Unit = {
  if (oldX != x) {
     oldX = x
    //do stuff
  }
}

javaService.register(listener)

如果没有 val 或可变集合,我将如何在 Scala 中为这种东西设计一个包装器?我不能在 JavaServiceListener 级别,因为我受方法签名的约束,所以我需要另一个层,java 侦听器以某种方式转发到

首先,如果你设计的是纯函数式程序,你不能returnUnitFuture[Unit]也不行,因为Future不抑制副作用)。

如果性能不是问题,我会使用 Kleisli[Option, xType, IO[Unit]],其中 T = Option。所以你要做的第一件事就是定义(添加适当的类型)

def updated(oldX, x): Kleisli[Option, xType, xType] = Kleisli liftF {
   if(x != oldX) None
   else Some(x)
}

def doStuff(x, y): Kleisli[Option, xType, IO[Unit]] = Kleisli pure {
    IO{
       //doStuff
    }
}

现在您可以将它们组合成这样的理解方式:

val result: Kleisli[Option, xType, IO[Unit]] = for{
   xx <- updated(oldX, x)
   effect <- doStuff(xx, y)
} yield effect

您可以使用 ReaderWriterStateT 执行状态计算,因此您将 oldX 保持为状态。

我的偏好是将其包装在 Monix Observable 中,然后您可以使用 distinctUntilChanged 来消除连续的重复项。类似于:

import monix.reactive._

val observable = Observable.create(OverflowStrategy.Fail(10)){(sync) =>
    val listener = new JavaServiceListener() {
      def updated(val x, val y): Unit = {
        sync.onNext(x)
      }
    }

    javaService.register(listener)
    Cancelable{() => javaService.unregister(listener)}
  }

val distinctObservable = observable.distinctUntilChanged

响应式编程允许您使用纯模型,而库处理所有困难的事情。

我找到了我喜欢的 Cats and Cats-Effect 解决方案:

trait MyListener {
  def onChange(n: Int): Unit
}

class MyDistinctFunctionalListener(private val last: Ref[IO, Int], consumer: Int => Unit) extends MyListener {
  override def onChange(newValue: Int): Unit = {
    val program =
      last
        .getAndSet(newValue)
        .flatMap(oldValue => notify(newValue, oldValue))

    program.unsafeRunSync()
  }

  private def notify(newValue: Int, oldValue: Int): IO[Unit] = {
    if (oldValue != newValue) IO(consumer(newValue)) else IO.delay(println("found duplicate"))
  }
}

object MyDistinctFunctionalListener {
  def create(consumer: Int => Unit): IO[MyDistinctFunctionalListener] =
    Ref[IO].of(0).map(v => new MyDistinctFunctionalListener(v, consumer))
}

val printer: Int => Unit = println(_)

val functionalDistinctPrinterIO =  MyDistinctFunctionalListener.create(printer)

functionalDistinctPrinterIO.map(fl =>
  List(1, 1, 2, 2, 3, 3, 3, 4, 5, 5).foreach(fl.onChange)
).unsafeRunSync()

这里有更多关于处理共享状态的内容https://github.com/systemfw/scala-italy-2018

与私有 var 解决方案相比,这是否值得值得商榷