FS2 的对象池模式
Object pool pattern with FS2
我正在尝试了解实施 Object Pool pattern in FS2 的最佳方法是什么。
假设我们有以下 MyPrinter
定义:
class MyPrinter {
import scala.util.Random.nextInt
Thread.sleep(5000 + nextInt(1000))
def doStuff(s: String): Unit = {
println(s)
Thread.sleep(1000 + nextInt(1000))
}
def releaseResources(): Unit =
println("Releasing resources")
}
制作 Stream[Task, MyPrinter]
由 n
台打印机支持的最佳方法是什么?当流结束时,应通过调用 releaseResources
.
正确释放所有底层资源
奖金问题:如果打印机由于某种原因终止,是否可以在池中创建一个新打印机?
不确定我是否回答了这个问题,但是这个怎么样
implicit val S = Strategy.fromFixedDaemonPool(10, "pooling")
val queue = new LinkedBlockingDeque[MyPrinter]()
queue.add(new MyPrinter)
queue.add(new MyPrinter)
Stream.repeatEval(Task.delay(queue.take()))
.map(p => try p.doStuff("test") finally {
p.releaseResources()
queue.put(p)
})
.take(10)
.runLog
.unsafeRun()
队列可以替换为https://commons.apache.org/proper/commons-pool/
更新:
如果您想同时处理每个 "resource":
concurrent.join(10)(
Stream
.repeatEval(Task.delay(queue.take()))
.map(p => Stream.eval(Task.delay(p.doStuff("test"))
.map(_ => p /* done with this resource */)))
).map(p => { p.releaseResources(); queue.put(p) /* release resource */})
.take(10).runLog.unsafeRun()
我正在尝试了解实施 Object Pool pattern in FS2 的最佳方法是什么。
假设我们有以下 MyPrinter
定义:
class MyPrinter {
import scala.util.Random.nextInt
Thread.sleep(5000 + nextInt(1000))
def doStuff(s: String): Unit = {
println(s)
Thread.sleep(1000 + nextInt(1000))
}
def releaseResources(): Unit =
println("Releasing resources")
}
制作 Stream[Task, MyPrinter]
由 n
台打印机支持的最佳方法是什么?当流结束时,应通过调用 releaseResources
.
奖金问题:如果打印机由于某种原因终止,是否可以在池中创建一个新打印机?
不确定我是否回答了这个问题,但是这个怎么样
implicit val S = Strategy.fromFixedDaemonPool(10, "pooling")
val queue = new LinkedBlockingDeque[MyPrinter]()
queue.add(new MyPrinter)
queue.add(new MyPrinter)
Stream.repeatEval(Task.delay(queue.take()))
.map(p => try p.doStuff("test") finally {
p.releaseResources()
queue.put(p)
})
.take(10)
.runLog
.unsafeRun()
队列可以替换为https://commons.apache.org/proper/commons-pool/
更新:
如果您想同时处理每个 "resource":
concurrent.join(10)(
Stream
.repeatEval(Task.delay(queue.take()))
.map(p => Stream.eval(Task.delay(p.doStuff("test"))
.map(_ => p /* done with this resource */)))
).map(p => { p.releaseResources(); queue.put(p) /* release resource */})
.take(10).runLog.unsafeRun()