使用 java.util.concurrent 和 cats.effect 的自定义同步
Custom synchronization using java.util.concurrent with cats.effect
我有一个非常自定义的非平凡同步的要求,可以用公平的 ReentrantLock
和 Phaser
来实现。在 fs2
和 cats.effect
.
上实施似乎是不可能的(没有重要的定制)
由于需要将所有阻塞操作包装到一个Blocker
中,因此代码如下:
private val l: ReentrantLock = new ReentrantLock(true)
private val c: Condition = l.newCondition
private val b: Blocker = //...
//F is declared on the class level
def lockedMutex(conditionPredicate: Int => Boolean): F[Unit] = blocker.blockOn {
Sync[F].delay(l.lock()).bracket(_ => Sync[F].delay{
while(!conditionPredicate(2)){
c.await()
}
})(_ => Sync[F].delay(l.unlock()))
}
问题:
是否保证包含 c.await()
的代码将在与 acquires/releases 和 ReentrantLock
相同的 Thread
中执行?
这是一个关键部分,因为如果不是 IllegalMonitorStateException
将被抛出。
使用 cats-effect 之类的东西时,您真的不需要担心线程问题,而是可以在更高层次上描述您的问题。
这应该会得到您想要的相同行为,它将是 运行 高优先级作业,直到没有更多作业可供选择低优先级作业。在完成一个低优先级的工作后,每个光纤将首先检查是否有更多的高优先级工作,然后再尝试选择一个低优先级的工作:
import cats.effect.Async
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import scala.concurrent.ExecutionContext
object HighLowPriorityRunner {
final case class Config[F[_]](
highPriorityJobs: Queue[F, F[Unit]],
lowPriorityJobs: Queue[F, F[Unit]],
customEC: Option[ExecutionContext]
)
def apply[F[_]](config: Config[F])
(implicit F: Async[F]): F[Unit] = {
val processOneJob =
config.highPriorityJobs.tryTake.flatMap {
case Some(hpJob) => hpJob
case None => config.lowPriorityJobs.tryTake.flatMap {
case Some(lpJob) => lpJob
case None => F.unit
}
}
val loop: F[Unit] = processOneJob.start.foreverM
config.customEC.fold(ifEmpty = loop)(ec => loop.evalOn(ec))
}
}
您可以使用 customEC
来提供您自己的 ExecutionContext 来控制作为 运行 您的纤维的真实线程的数量。
代码可以这样使用:
import cats.effect.{Async, IO, IOApp, Resource}
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
Resource.make(IO(Executors.newFixedThreadPool(2)))(ec => IO.blocking(ec.shutdown())).use { ec =>
Program[IO](ExecutionContext.fromExecutor(ec))
}
}
object Program {
private def createJob[F[_]](id: Int)(implicit F: Async[F]): F[Unit] =
F.delay(println(s"Starting job ${id} on thread ${Thread.currentThread.getName}")) *>
F.delay(Thread.sleep(1.second.toMillis)) *> // Blocks the Fiber! - Only for testing, use F.sleep on real code.
F.delay(println(s"Finished job ${id}!"))
def apply[F[_]](customEC: ExecutionContext)(implicit F: Async[F]): F[Unit] = for {
highPriorityJobs <- Queue.unbounded[F, F[Unit]]
lowPriorityJobs <- Queue.unbounded[F, F[Unit]]
runnerFiber <- HighLowPriorityRunner(HighLowPriorityRunner.Config(
highPriorityJobs,
lowPriorityJobs,
Some(customEC)
)).start
_ <- List.range(0, 10).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- List.range(10, 15).traverse_(id => lowPriorityJobs.offer(createJob(id)))
_ <- F.sleep(5.seconds)
_ <- List.range(15, 20).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- runnerFiber.join.void
} yield ()
}
应该产生如下输出:
Starting job 0 on thread pool-1-thread-1
Starting job 1 on thread pool-1-thread-2
Finished job 0!
Finished job 1!
Starting job 2 on thread pool-1-thread-1
Starting job 3 on thread pool-1-thread-2
Finished job 2!
Finished job 3!
Starting job 4 on thread pool-1-thread-1
Starting job 5 on thread pool-1-thread-2
Finished job 4!
Finished job 5!
Starting job 6 on thread pool-1-thread-1
Starting job 7 on thread pool-1-thread-2
Finished job 6!
Finished job 7!
Starting job 8 on thread pool-1-thread-1
Starting job 9 on thread pool-1-thread-2
Finished job 8!
Finished job 9!
Starting job 10 on thread pool-1-thread-1
Starting job 11 on thread pool-1-thread-2
Finished job 10!
Finished job 11!
Starting job 15 on thread pool-1-thread-1
Starting job 16 on thread pool-1-thread-2
Finished job 15!
Finished job 16!
Starting job 17 on thread pool-1-thread-1
Starting job 18 on thread pool-1-thread-2
Finished job 17!
Finished job 18!
Starting job 19 on thread pool-1-thread-1
Starting job 12 on thread pool-1-thread-2
Finished job 19!
Starting job 13 on thread pool-1-thread-1
Finished job 12!
Starting job 14 on thread pool-1-thread-2
Finished job 13!
Finished job 14!
感谢 Gavin Bisesi (@Daenyth) 将我最初的想法完善成这个!
完整代码可用here。
我有一个非常自定义的非平凡同步的要求,可以用公平的 ReentrantLock
和 Phaser
来实现。在 fs2
和 cats.effect
.
由于需要将所有阻塞操作包装到一个Blocker
中,因此代码如下:
private val l: ReentrantLock = new ReentrantLock(true)
private val c: Condition = l.newCondition
private val b: Blocker = //...
//F is declared on the class level
def lockedMutex(conditionPredicate: Int => Boolean): F[Unit] = blocker.blockOn {
Sync[F].delay(l.lock()).bracket(_ => Sync[F].delay{
while(!conditionPredicate(2)){
c.await()
}
})(_ => Sync[F].delay(l.unlock()))
}
问题:
是否保证包含 c.await()
的代码将在与 acquires/releases 和 ReentrantLock
相同的 Thread
中执行?
这是一个关键部分,因为如果不是 IllegalMonitorStateException
将被抛出。
使用 cats-effect 之类的东西时,您真的不需要担心线程问题,而是可以在更高层次上描述您的问题。
这应该会得到您想要的相同行为,它将是 运行 高优先级作业,直到没有更多作业可供选择低优先级作业。在完成一个低优先级的工作后,每个光纤将首先检查是否有更多的高优先级工作,然后再尝试选择一个低优先级的工作:
import cats.effect.Async
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import scala.concurrent.ExecutionContext
object HighLowPriorityRunner {
final case class Config[F[_]](
highPriorityJobs: Queue[F, F[Unit]],
lowPriorityJobs: Queue[F, F[Unit]],
customEC: Option[ExecutionContext]
)
def apply[F[_]](config: Config[F])
(implicit F: Async[F]): F[Unit] = {
val processOneJob =
config.highPriorityJobs.tryTake.flatMap {
case Some(hpJob) => hpJob
case None => config.lowPriorityJobs.tryTake.flatMap {
case Some(lpJob) => lpJob
case None => F.unit
}
}
val loop: F[Unit] = processOneJob.start.foreverM
config.customEC.fold(ifEmpty = loop)(ec => loop.evalOn(ec))
}
}
您可以使用 customEC
来提供您自己的 ExecutionContext 来控制作为 运行 您的纤维的真实线程的数量。
代码可以这样使用:
import cats.effect.{Async, IO, IOApp, Resource}
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
Resource.make(IO(Executors.newFixedThreadPool(2)))(ec => IO.blocking(ec.shutdown())).use { ec =>
Program[IO](ExecutionContext.fromExecutor(ec))
}
}
object Program {
private def createJob[F[_]](id: Int)(implicit F: Async[F]): F[Unit] =
F.delay(println(s"Starting job ${id} on thread ${Thread.currentThread.getName}")) *>
F.delay(Thread.sleep(1.second.toMillis)) *> // Blocks the Fiber! - Only for testing, use F.sleep on real code.
F.delay(println(s"Finished job ${id}!"))
def apply[F[_]](customEC: ExecutionContext)(implicit F: Async[F]): F[Unit] = for {
highPriorityJobs <- Queue.unbounded[F, F[Unit]]
lowPriorityJobs <- Queue.unbounded[F, F[Unit]]
runnerFiber <- HighLowPriorityRunner(HighLowPriorityRunner.Config(
highPriorityJobs,
lowPriorityJobs,
Some(customEC)
)).start
_ <- List.range(0, 10).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- List.range(10, 15).traverse_(id => lowPriorityJobs.offer(createJob(id)))
_ <- F.sleep(5.seconds)
_ <- List.range(15, 20).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- runnerFiber.join.void
} yield ()
}
应该产生如下输出:
Starting job 0 on thread pool-1-thread-1
Starting job 1 on thread pool-1-thread-2
Finished job 0!
Finished job 1!
Starting job 2 on thread pool-1-thread-1
Starting job 3 on thread pool-1-thread-2
Finished job 2!
Finished job 3!
Starting job 4 on thread pool-1-thread-1
Starting job 5 on thread pool-1-thread-2
Finished job 4!
Finished job 5!
Starting job 6 on thread pool-1-thread-1
Starting job 7 on thread pool-1-thread-2
Finished job 6!
Finished job 7!
Starting job 8 on thread pool-1-thread-1
Starting job 9 on thread pool-1-thread-2
Finished job 8!
Finished job 9!
Starting job 10 on thread pool-1-thread-1
Starting job 11 on thread pool-1-thread-2
Finished job 10!
Finished job 11!
Starting job 15 on thread pool-1-thread-1
Starting job 16 on thread pool-1-thread-2
Finished job 15!
Finished job 16!
Starting job 17 on thread pool-1-thread-1
Starting job 18 on thread pool-1-thread-2
Finished job 17!
Finished job 18!
Starting job 19 on thread pool-1-thread-1
Starting job 12 on thread pool-1-thread-2
Finished job 19!
Starting job 13 on thread pool-1-thread-1
Finished job 12!
Starting job 14 on thread pool-1-thread-2
Finished job 13!
Finished job 14!
感谢 Gavin Bisesi (@Daenyth) 将我最初的想法完善成这个!
完整代码可用here。