如何在 Arrow + Reactor Monad 理解中异步执行
How to execute asyncronously inside an Arrow + Reactor Monad comprehension
在下面的代码中,每个 helloX()
方法 运行 都是异步的(它是一个延迟的 Mono,运行 在单独的线程中),请参阅下面的完整代码):
override fun helloEverybody(): Kind<ForMonoK, String> {
return MonoK.monad().fx.monad {
val j = !helloJoey()
val j2 = !helloJohn()
val j3 = !helloMary()
"$j and $j2 and $j3"
}.fix()
}
但是在日志中我看到它们是运行 seucentially:
14:10:46.983 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:10:47.084 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey()
14:10:49.087 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey() - ready
14:10:49.090 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn()
14:10:54.091 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn() - ready
14:10:54.092 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary()
14:10:59.095 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary() - ready
hello Joey and hello John and hello Mary
我怎样才能使它们并行执行并在所有结果都完成后汇总 monad 理解中的所有结果?
包含 main 方法的完整代码():
class HelloServiceImpl : HelloService<ForMonoK> {
private val logger = LoggerFactory.getLogger(javaClass)
override fun helloEverybody(): Kind<ForMonoK, String> {
return MonoK.monad().fx.monad {
val j = !helloJoey()
val j2 = !helloJohn()
val j3 = !helloMary()
"$j and $j2 and $j3"
}.fix()
}
override fun helloJoey(): Kind<ForMonoK, String> {
return Mono.defer {
logger.info("helloJoey()")
sleep(2000)
logger.info("helloJoey() - ready")
Mono.just("hello Joey")
}.subscribeOn(Schedulers.elastic()).k()
}
override fun helloJohn(): Kind<ForMonoK, String> {
return Mono.defer {
logger.info("helloJohn()")
sleep(5000)
logger.info("helloJohn() - ready")
Mono.just("hello John")
}.subscribeOn(Schedulers.elastic()).k()
}
override fun helloMary(): Kind<ForMonoK, String> {
return Mono.defer {
logger.info("helloMary()")
sleep(5000)
logger.info("helloMary() - ready")
Mono.just("hello Mary")
}.subscribeOn(Schedulers.elastic()).k()
}
}
fun main() {
val countDownLatch = CountDownLatch(1)
HelloServiceImpl().helloEverybody().fix().mono.subscribe {
println(it)
countDownLatch.countDown()
}
countDownLatch.await()
}
更新
我已经调整了将顺序操作与并行操作相结合的方法:
override fun helloEverybody(): Kind<ForMonoK, String> {
return MonoK.async().fx.async {
val j = helloJoey().bind()
val j2= Dispatchers.IO
.parMapN(helloJohn(), helloMary()){ it1, it2 -> "$it1 and $it2" }
"$j and $j2"
}
}
不幸的是 parMapN 不能与 ForMonoK 一起使用:
Type inference failed: fun <A, B, C, D> CoroutineContext.parMapN(fa: Kind<ForIO, A>, fb: Kind<ForIO, B>, fc: Kind<ForIO, C>, f: (A, B, C) -> D): IO<D>
cannot be applied to
receiver: CoroutineDispatcher arguments: (Kind<ForMonoK, String>,Kind<ForMonoK, String>,Kind<ForMonoK, String>,(String, String, String) -> String)
想法?
flatMap
,与map
相同,没有线程语义或并行性。您所追求的是 parMap
和 parTraverse
,它们并行运行多个 MonoK
。
那时 fx
块就变得必不可少了,因为它是为顺序操作而设计的。您可以混合搭配两者。
MonoK.async().fx.async {
val result =
Dispatchers.IO
.parMap(helloJoey(), helloMary()) { joe, mary -> ... }
.bind()
otherThing(result).bind()
}
在下面的代码中,每个 helloX()
方法 运行 都是异步的(它是一个延迟的 Mono,运行 在单独的线程中),请参阅下面的完整代码):
override fun helloEverybody(): Kind<ForMonoK, String> {
return MonoK.monad().fx.monad {
val j = !helloJoey()
val j2 = !helloJohn()
val j3 = !helloMary()
"$j and $j2 and $j3"
}.fix()
}
但是在日志中我看到它们是运行 seucentially:
14:10:46.983 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework
14:10:47.084 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey()
14:10:49.087 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJoey() - ready
14:10:49.090 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn()
14:10:54.091 [elastic-3] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloJohn() - ready
14:10:54.092 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary()
14:10:59.095 [elastic-2] INFO com.codependent.kotlinarrow.service.HelloServiceImpl - helloMary() - ready
hello Joey and hello John and hello Mary
我怎样才能使它们并行执行并在所有结果都完成后汇总 monad 理解中的所有结果?
包含 main 方法的完整代码():
class HelloServiceImpl : HelloService<ForMonoK> {
private val logger = LoggerFactory.getLogger(javaClass)
override fun helloEverybody(): Kind<ForMonoK, String> {
return MonoK.monad().fx.monad {
val j = !helloJoey()
val j2 = !helloJohn()
val j3 = !helloMary()
"$j and $j2 and $j3"
}.fix()
}
override fun helloJoey(): Kind<ForMonoK, String> {
return Mono.defer {
logger.info("helloJoey()")
sleep(2000)
logger.info("helloJoey() - ready")
Mono.just("hello Joey")
}.subscribeOn(Schedulers.elastic()).k()
}
override fun helloJohn(): Kind<ForMonoK, String> {
return Mono.defer {
logger.info("helloJohn()")
sleep(5000)
logger.info("helloJohn() - ready")
Mono.just("hello John")
}.subscribeOn(Schedulers.elastic()).k()
}
override fun helloMary(): Kind<ForMonoK, String> {
return Mono.defer {
logger.info("helloMary()")
sleep(5000)
logger.info("helloMary() - ready")
Mono.just("hello Mary")
}.subscribeOn(Schedulers.elastic()).k()
}
}
fun main() {
val countDownLatch = CountDownLatch(1)
HelloServiceImpl().helloEverybody().fix().mono.subscribe {
println(it)
countDownLatch.countDown()
}
countDownLatch.await()
}
更新
我已经调整了将顺序操作与并行操作相结合的方法:
override fun helloEverybody(): Kind<ForMonoK, String> {
return MonoK.async().fx.async {
val j = helloJoey().bind()
val j2= Dispatchers.IO
.parMapN(helloJohn(), helloMary()){ it1, it2 -> "$it1 and $it2" }
"$j and $j2"
}
}
不幸的是 parMapN 不能与 ForMonoK 一起使用:
Type inference failed: fun <A, B, C, D> CoroutineContext.parMapN(fa: Kind<ForIO, A>, fb: Kind<ForIO, B>, fc: Kind<ForIO, C>, f: (A, B, C) -> D): IO<D>
cannot be applied to
receiver: CoroutineDispatcher arguments: (Kind<ForMonoK, String>,Kind<ForMonoK, String>,Kind<ForMonoK, String>,(String, String, String) -> String)
想法?
flatMap
,与map
相同,没有线程语义或并行性。您所追求的是 parMap
和 parTraverse
,它们并行运行多个 MonoK
。
那时 fx
块就变得必不可少了,因为它是为顺序操作而设计的。您可以混合搭配两者。
MonoK.async().fx.async {
val result =
Dispatchers.IO
.parMap(helloJoey(), helloMary()) { joe, mary -> ... }
.bind()
otherThing(result).bind()
}