如何在 scalaz 任务中关闭 AsyncHttpClient
How to close AsyncHttpClient in scalaz Task
我正在尝试将 AsyncHttpClient 和 Scalaz Task 组合在一起。通常,如果我使用 AsyncHttpClient,我可以调用 client.close 来停止客户端。
val asyncHttpClient = new AsyncHttpClient()
println(asyncHttpClient.prepareGet("http://www.google.com"))
asyncHttpClient.close()
所以电流将停止。但是,如果我将 api 调用包装到任务中。我不知道如何阻止它。
def get(s: String) = Task.async[Int](k => {
asyncHttpClient.prepareGet(s).execute(toHandler)
Thread.sleep(5000)
asyncHttpClient.closeAsynchronously()
} )
def toHandler[A] = new AsyncCompletionHandler[Response] {
def onCompleted(r: Response) = {
println("get response ", r.getResponseBody)
r
}
def onError(e: Throwable) = {
println("some error")
e
}
}
println(get("http://www.google.com").run)
当前进程仍为运行。我在想原因是 Task 和 AsynClient 都是异步的。我不知道我应该怎么做才能关闭它
非常感谢
问题是Task.async
接受了一个可以注册回调的函数。这有点令人困惑,类型也没有太大帮助,因为里面有那么多该死的 Unit
,但这里的意思是你想要更像这样的东西:
import com.ning.http.client._
import scalaz.syntax.either._
import scalaz.concurrent.Task
val asyncHttpClient = new AsyncHttpClient()
def get(s: String): Task[Response] = Task.async[Response](callback =>
asyncHttpClient.prepareGet(s).execute(
new AsyncCompletionHandler[Unit] {
def onCompleted(r: Response): Unit = callback(r.right)
def onError(e: Throwable): Unit = callback(e.left)
}
)
)
这并没有处理关闭客户端的问题——它只是为了展示一般的想法。您可以在处理程序中关闭客户端,但我建议更像这样:
import com.ning.http.client._
import scalaz.syntax.either._
import scalaz.concurrent.Task
def get(client: AsyncHttpClient)(s: String): Task[Response] =
Task.async[Response](callback =>
client.prepareGet(s).execute(
new AsyncCompletionHandler[Unit] {
def onCompleted(r: Response): Unit = callback(r.right)
def onError(e: Throwable): Unit = callback(e.left)
}
)
)
def initClient: Task[AsyncHttpClient] = Task(new AsyncHttpClient())
def closeClient(client: AsyncHttpClient): Task[Unit] = Task(client.close())
然后:
val res = for {
c <- initClient
r <- get(c)("http://www.google.com")
_ <- closeClient(c)
} yield r
res.unsafePerformAsync(
_.fold(
_ => println("some error"),
r => println("get response " + r.getResponseBody)
)
)
这避免了 closeAsynchronously
(无论如何似乎是 going away)。
我正在尝试将 AsyncHttpClient 和 Scalaz Task 组合在一起。通常,如果我使用 AsyncHttpClient,我可以调用 client.close 来停止客户端。
val asyncHttpClient = new AsyncHttpClient()
println(asyncHttpClient.prepareGet("http://www.google.com"))
asyncHttpClient.close()
所以电流将停止。但是,如果我将 api 调用包装到任务中。我不知道如何阻止它。
def get(s: String) = Task.async[Int](k => {
asyncHttpClient.prepareGet(s).execute(toHandler)
Thread.sleep(5000)
asyncHttpClient.closeAsynchronously()
} )
def toHandler[A] = new AsyncCompletionHandler[Response] {
def onCompleted(r: Response) = {
println("get response ", r.getResponseBody)
r
}
def onError(e: Throwable) = {
println("some error")
e
}
}
println(get("http://www.google.com").run)
当前进程仍为运行。我在想原因是 Task 和 AsynClient 都是异步的。我不知道我应该怎么做才能关闭它
非常感谢
问题是Task.async
接受了一个可以注册回调的函数。这有点令人困惑,类型也没有太大帮助,因为里面有那么多该死的 Unit
,但这里的意思是你想要更像这样的东西:
import com.ning.http.client._
import scalaz.syntax.either._
import scalaz.concurrent.Task
val asyncHttpClient = new AsyncHttpClient()
def get(s: String): Task[Response] = Task.async[Response](callback =>
asyncHttpClient.prepareGet(s).execute(
new AsyncCompletionHandler[Unit] {
def onCompleted(r: Response): Unit = callback(r.right)
def onError(e: Throwable): Unit = callback(e.left)
}
)
)
这并没有处理关闭客户端的问题——它只是为了展示一般的想法。您可以在处理程序中关闭客户端,但我建议更像这样:
import com.ning.http.client._
import scalaz.syntax.either._
import scalaz.concurrent.Task
def get(client: AsyncHttpClient)(s: String): Task[Response] =
Task.async[Response](callback =>
client.prepareGet(s).execute(
new AsyncCompletionHandler[Unit] {
def onCompleted(r: Response): Unit = callback(r.right)
def onError(e: Throwable): Unit = callback(e.left)
}
)
)
def initClient: Task[AsyncHttpClient] = Task(new AsyncHttpClient())
def closeClient(client: AsyncHttpClient): Task[Unit] = Task(client.close())
然后:
val res = for {
c <- initClient
r <- get(c)("http://www.google.com")
_ <- closeClient(c)
} yield r
res.unsafePerformAsync(
_.fold(
_ => println("some error"),
r => println("get response " + r.getResponseBody)
)
)
这避免了 closeAsynchronously
(无论如何似乎是 going away)。