具有 Mongo 集合的 Monix 任务:错误处理
Monix Task with Mongo Collection: Error handling
我正在尝试将 Monix Task
与 mongo-scala-driver
一起使用。我有点难以理解 Error Handling
val mongoClient: Resource[Task, MongoConnection[Task, DomainModel]] =
MongoTypedConnection.create[Task, DomainModel](
"mongodb:...&authMechanism=SCRAM-SHA-1"
)
mongoClient.use { client =>
val changeStream: Task[ChangeStreamObservable[DomainModel]] =
for {
collection <- client.getMongoCollection("myDatabase", "myCollection")
changes <- client.watchCollection(collection)
} yield changes
...
...
...
.as(ExitCode.Success)
}
在没有错误的情况下,这非常有效。我想为此添加错误处理(例如处理不正确的 database
和 collection
名称)。我基于文档的初步尝试是尝试:
val changeObs: io.Serializable =
Await.result(changeStream
.onErrorHandleWith {
case _: TimeoutException =>
// Oh, we know about timeouts, recover it
Task.now("Recovered!")
case other =>
// We have no idea what happened, raise error!
Task.raiseError(other)
}.runToFuture, 5.seconds)
但这给了我一个io.Serializable
。如何在保留 ChangeStreamObservable[DomainModel]
的同时进行某种巧妙的错误处理?感谢指出我可以研究的任何模式。
BR
原来是我看错了。
Task[ChangeStreamObservable[DomainModel]]
已经有一个 MonadError
。对于像我这样的菜鸟来说,这本质上意味着它不会丢失错误。所以这可以在代码库的最后完成:
changeStream //Or any other Task/Observable which (is composed)composes (from)this Task
.onErrorHandle {
case timeout: MongoTimeoutException =>
logger.error(timeout.getMessage)
case illegal: java.lang.IllegalArgumentException =>
logger.error(illegal.getMessage)
case unauthorized: com.mongodb.MongoCommandException =>
logger.error(unauthorized.getMessage)
我试图 运行 任务只是为了处理代码库中间的错误,认为如果我组合多个 Tasks/Observable,我会丢失初始错误。
我正在尝试将 Monix Task
与 mongo-scala-driver
一起使用。我有点难以理解 Error Handling
val mongoClient: Resource[Task, MongoConnection[Task, DomainModel]] =
MongoTypedConnection.create[Task, DomainModel](
"mongodb:...&authMechanism=SCRAM-SHA-1"
)
mongoClient.use { client =>
val changeStream: Task[ChangeStreamObservable[DomainModel]] =
for {
collection <- client.getMongoCollection("myDatabase", "myCollection")
changes <- client.watchCollection(collection)
} yield changes
...
...
...
.as(ExitCode.Success)
}
在没有错误的情况下,这非常有效。我想为此添加错误处理(例如处理不正确的 database
和 collection
名称)。我基于文档的初步尝试是尝试:
val changeObs: io.Serializable =
Await.result(changeStream
.onErrorHandleWith {
case _: TimeoutException =>
// Oh, we know about timeouts, recover it
Task.now("Recovered!")
case other =>
// We have no idea what happened, raise error!
Task.raiseError(other)
}.runToFuture, 5.seconds)
但这给了我一个io.Serializable
。如何在保留 ChangeStreamObservable[DomainModel]
的同时进行某种巧妙的错误处理?感谢指出我可以研究的任何模式。
BR
原来是我看错了。
Task[ChangeStreamObservable[DomainModel]]
已经有一个 MonadError
。对于像我这样的菜鸟来说,这本质上意味着它不会丢失错误。所以这可以在代码库的最后完成:
changeStream //Or any other Task/Observable which (is composed)composes (from)this Task
.onErrorHandle {
case timeout: MongoTimeoutException =>
logger.error(timeout.getMessage)
case illegal: java.lang.IllegalArgumentException =>
logger.error(illegal.getMessage)
case unauthorized: com.mongodb.MongoCommandException =>
logger.error(unauthorized.getMessage)
我试图 运行 任务只是为了处理代码库中间的错误,认为如果我组合多个 Tasks/Observable,我会丢失初始错误。