使用 java.util.concurrent 和 cats.effect 的自定义同步

Custom synchronization using java.util.concurrent with cats.effect

我有一个非常自定义的非平凡同步的要求,可以用公平的 ReentrantLockPhaser 来实现。在 fs2cats.effect.

上实施似乎是不可能的(没有重要的定制)

由于需要将所有阻塞操作包装到一个Blocker中,因此代码如下:

private val l: ReentrantLock = new ReentrantLock(true)
private val c: Condition = l.newCondition
private val b: Blocker = //...

//F is declared on the class level
def lockedMutex(conditionPredicate: Int => Boolean): F[Unit] = blocker.blockOn {
  Sync[F].delay(l.lock()).bracket(_ => Sync[F].delay{
    while(!conditionPredicate(2)){
      c.await()
    }
  })(_ => Sync[F].delay(l.unlock()))
}

问题: 是否保证包含 c.await() 的代码将在与 acquires/releases 和 ReentrantLock 相同的 Thread 中执行?

这是一个关键部分,因为如果不是 IllegalMonitorStateException 将被抛出。

使用 cats-effect 之类的东西时,您真的不需要担心线程问题,而是可以在更高层次上描述您的问题。

这应该会得到您想要的相同行为,它将是 运行 高优先级作业,直到没有更多作业可供选择低优先级作业。在完成一个低优先级的工作后,每个光纤将首先检查是否有更多的高优先级工作,然后再尝试选择一个低优先级的工作:

import cats.effect.Async
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._

import scala.concurrent.ExecutionContext

object HighLowPriorityRunner {
  final case class Config[F[_]](
      highPriorityJobs: Queue[F, F[Unit]],
      lowPriorityJobs: Queue[F, F[Unit]],
      customEC: Option[ExecutionContext]
  )

  def apply[F[_]](config: Config[F])
                 (implicit F: Async[F]): F[Unit] = {
    val processOneJob =
      config.highPriorityJobs.tryTake.flatMap {
        case Some(hpJob) => hpJob
        case None => config.lowPriorityJobs.tryTake.flatMap {
          case Some(lpJob) => lpJob
          case None => F.unit
        }
      }

    val loop: F[Unit] = processOneJob.start.foreverM

    config.customEC.fold(ifEmpty = loop)(ec => loop.evalOn(ec))
  }
}

您可以使用 customEC 来提供您自己的 ExecutionContext 来控制作为 运行 您的纤维的真实线程的数量。

代码可以这样使用:

import cats.effect.{Async, IO, IOApp, Resource}
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._

import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    Resource.make(IO(Executors.newFixedThreadPool(2)))(ec => IO.blocking(ec.shutdown())).use { ec =>
      Program[IO](ExecutionContext.fromExecutor(ec))
    }
}

object Program {
  private def createJob[F[_]](id: Int)(implicit F: Async[F]): F[Unit] =
    F.delay(println(s"Starting job ${id} on thread ${Thread.currentThread.getName}")) *>
    F.delay(Thread.sleep(1.second.toMillis)) *> // Blocks the Fiber! - Only for testing, use F.sleep on real code.
    F.delay(println(s"Finished job ${id}!"))

  def apply[F[_]](customEC: ExecutionContext)(implicit F: Async[F]): F[Unit] = for {
    highPriorityJobs <- Queue.unbounded[F, F[Unit]]
    lowPriorityJobs <- Queue.unbounded[F, F[Unit]]
    runnerFiber <- HighLowPriorityRunner(HighLowPriorityRunner.Config(
      highPriorityJobs,
      lowPriorityJobs,
      Some(customEC)
    )).start
    _ <- List.range(0, 10).traverse_(id => highPriorityJobs.offer(createJob(id)))
    _ <- List.range(10, 15).traverse_(id => lowPriorityJobs.offer(createJob(id)))
    _ <- F.sleep(5.seconds)
    _ <- List.range(15, 20).traverse_(id => highPriorityJobs.offer(createJob(id)))
    _ <- runnerFiber.join.void
  } yield ()
}

应该产生如下输出:

Starting job 0 on thread pool-1-thread-1
Starting job 1 on thread pool-1-thread-2
Finished job 0!
Finished job 1!
Starting job 2 on thread pool-1-thread-1
Starting job 3 on thread pool-1-thread-2
Finished job 2!
Finished job 3!
Starting job 4 on thread pool-1-thread-1
Starting job 5 on thread pool-1-thread-2
Finished job 4!
Finished job 5!
Starting job 6 on thread pool-1-thread-1
Starting job 7 on thread pool-1-thread-2
Finished job 6!
Finished job 7!
Starting job 8 on thread pool-1-thread-1
Starting job 9 on thread pool-1-thread-2
Finished job 8!
Finished job 9!
Starting job 10 on thread pool-1-thread-1
Starting job 11 on thread pool-1-thread-2
Finished job 10!
Finished job 11!
Starting job 15 on thread pool-1-thread-1
Starting job 16 on thread pool-1-thread-2
Finished job 15!
Finished job 16!
Starting job 17 on thread pool-1-thread-1
Starting job 18 on thread pool-1-thread-2
Finished job 17!
Finished job 18!
Starting job 19 on thread pool-1-thread-1
Starting job 12 on thread pool-1-thread-2
Finished job 19!
Starting job 13 on thread pool-1-thread-1
Finished job 12!
Starting job 14 on thread pool-1-thread-2
Finished job 13!
Finished job 14!

感谢 Gavin Bisesi (@Daenyth) 将我最初的想法完善成这个!


完整代码可用here