在 Haskell 中,如何在 Web 客户端断开连接时中止计算
In Haskell, how can I abort a calculation when a web client disconnects
我有一个基于 Haskell 的 Web 服务,它执行的计算对于某些输入可能需要很长时间才能完成。 ("really long"这里是指超过一分钟)
因为执行该计算需要服务器上所有可用的 CPU,所以我将传入的请求放入队列中(好吧,实际上是一个堆栈,原因与典型的客户端有关,但除此之外点)当他们到达并在当前 运行 计算完成时为他们提供服务。
我的问题是客户端并不总是等待足够长的时间,有时会在他们端超时、断开连接并尝试使用不同的服务器(好吧,他们再次尝试并遇到了 elb,通常会得到不同的服务器实例)。此外,有时网络客户端要求的计算会因为外部因素而过时,网络客户端将被终止。
在那些情况下,我真的希望能够在我从堆栈中提取下一个请求并开始(昂贵的)计算之前检测到 Web 客户端已经消失。不幸的是,我在 snap 方面的经验让我相信在该框架中没有办法询问 "is the client's TCP connection still connected?" 并且我还没有找到涵盖 "client disconnected" 案例的其他 Web 框架的任何文档。
那么有没有Haskell web 框架可以很容易地检测到web 客户端是否已断开连接?或者做不到这一点,有没有至少可以做到这一点的方法?
(我理解可能无法在所有情况下都绝对确定TCP客户端是否还没有向另一端发送数据;但是,当客户端实际向服务器发送RST数据包时,服务器的框架不允许应用程序代码确定连接已断开,这是个问题)
顺便说一句,虽然有人可能怀疑 warp's onClose
处理程序会让你这样做,但只有当响应准备好并写入客户端时才会触发,因此作为一种中止方式是无用的正在进行的计算。似乎也没有办法访问接受的套接字以设置 SO_KEEPALIVE
或类似的。 (有方法可以访问初始侦听套接字,但不能访问接受的套接字)
假设 'web service' 表示基于 HTTP(S) 的客户端,一种选择是使用 RESTful 方法。服务可以接受请求并 return 202 Accepted
,而不是假设客户端将保持连接。正如 HTTP status code specification 概述:
The request has been accepted for processing, but the processing has not been completed [...]
The 202 response is intentionally non-committal. Its purpose is to allow a server to accept a request for some other process (perhaps a batch-oriented process that is only run once per day) without requiring that the user agent's connection to the server persist until the process is completed. The entity returned with this response SHOULD include an indication of the request's current status and either a pointer to a status monitor or some estimate of when the user can expect the request to be fulfilled.
服务器立即响应 202 Accepted
响应,还包括一个 URL 客户端可以用来轮询状态。一种选择是将此 URL 放在响应的 Location
header 中,但您也可以将 URL 放在响应的 body 的 link 中].
客户端可以轮询状态 URL 来获取状态。计算完成后,状态资源可以为完成的结果提供 link。
如果您担心客户端轮询过于频繁,您可以将缓存 header 添加到状态资源和最终结果。
REST in Practice outlines the general concepts, while the RESTful Web Services Cookbook 有很多很好的细节。
我不是说你不能用 HTTP 或 TCP/IP 做某事(我不知道),但如果你不能,那么上面是 tried-and-true 类似问题的解决方法。
显然,这完全独立于编程语言,但根据我的经验,REST and algebraic data types go well together。
所以我找到了一个对我有用的答案,它可能对其他人也有用。
事实证明,您实际上可以充分利用 Warp 的内部结构来执行此操作,但是您剩下的是 Warp 的基本版本,如果您需要诸如日志记录之类的东西,需要在上面添加其他包。
此外,请注意所谓的 "half-closed" 连接(当客户端关闭其发送端,但仍在等待数据时)将被检测为已关闭,从而中断您的计算。我不知道有任何处理半关闭连接的 HTTP 客户端,但需要注意一些事情。
无论如何,我所做的是首先复制 Network.Wai.Handler.Warp
和 Network.Wai.Handler.Warp.Internal
暴露的函数 runSettings
和 runSettingsSocket
并制作调用我提供的函数的版本而不是WarpI.socketConnection
,这样我就有签名了:
runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
-> Wai.Application -> IO ()
这需要复制一些辅助方法,例如 setSocketCloseOnExec
和 windowsThreadBlockHack
。 double-IO
签名可能看起来很奇怪,但这就是你想要的 - 外部 IO
在主线程(调用 accept
)中是 运行,内部 IO
在 accept
returns 之后分叉的每个连接线程中是 运行。原来的Warp
函数runSettings
等价于:
\set -> runSettings' set (WarpI.socketConnection >=> return . return)
然后我做了:
data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared
runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
runSettings' set (WarpI.socketConnection >=> return . wrapConn)
where
-- Fork a 'monitor' thread that does nothing but attempt to
-- perform a read from conn in a loop 1/sec, and wrap the receive
-- methods on conn so that they first consume from the stuff read
-- by the monitoring thread. If the monitoring thread sees
-- end-of-file (signaled by an empty string read), raise
-- ClientDisappered on the per-connection thread.
wrapConn conn = do
tid <- myThreadId
nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
semaphore <- newMVar ()
readerCount <- newIORef (0 :: Int)
monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
return $ conn {
WarpI.connClose = throwTo monitorThread ClientDisappeared
>> WarpI.connClose conn
, WarpI.connRecv = newRecv nxtBstr semaphore readerCount
, WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
}
where
newRecv :: MVar ByteString -> MVar () -> IORef Int
-> IO ByteString
newRecv nxtBstr sem readerCount =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
case w of
Just w' -> return w'
Nothing -> WarpI.connRecv conn
)
newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
-> WarpI.Buffer -> WarpI.BufSize -> IO Bool
newRecvBuf nxtBstr sem readerCount buf bufSize =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do
(fulfilled, buf', bufSize') <-
if bufSize == 0 then return (False, buf, bufSize)
else
do w <- tryTakeMVar nxtBstr
case w of
Nothing -> return (False, buf, bufSize)
Just w' -> do
let wlen = B.length w'
if wlen > bufSize
then do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') bufSize
putMVar nxtBstr (B.drop bufSize w')
return (True, buf, 0)
else do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') wlen
return (wlen == bufSize, plusPtr buf wlen,
bufSize - wlen)
if fulfilled then return True
else WarpI.connRecvBuf conn buf' bufSize'
)
dropClientDisappeared :: ClientDisappeared -> IO ()
dropClientDisappeared _ = return ()
monitor tid nxtBstr sem st =
catch (monitor' tid nxtBstr sem st) dropClientDisappeared
monitor' tid nxtBstr sem st = do
(hitEOF, readerCount) <- withMVar sem $ \_ -> do
w <- tryTakeMVar nxtBstr
case w of
-- No one picked up our bytestring from last time
Just w' -> putMVar nxtBstr w' >> return (False, 0)
Nothing -> do
w <- WarpI.connRecv conn
putMVar nxtBstr w
readerCount <- readIORef st
return (B.null w, readerCount)
if hitEOF && (readerCount == 0)
-- Don't signal if main thread is also trying to read -
-- in that case, main thread will see EOF directly
then throwTo tid ClientDisappeared
else do threadDelay oneSecondInMicros
monitor' tid nxtBstr sem st
oneSecondInMicros = 1000000
我有一个基于 Haskell 的 Web 服务,它执行的计算对于某些输入可能需要很长时间才能完成。 ("really long"这里是指超过一分钟)
因为执行该计算需要服务器上所有可用的 CPU,所以我将传入的请求放入队列中(好吧,实际上是一个堆栈,原因与典型的客户端有关,但除此之外点)当他们到达并在当前 运行 计算完成时为他们提供服务。
我的问题是客户端并不总是等待足够长的时间,有时会在他们端超时、断开连接并尝试使用不同的服务器(好吧,他们再次尝试并遇到了 elb,通常会得到不同的服务器实例)。此外,有时网络客户端要求的计算会因为外部因素而过时,网络客户端将被终止。
在那些情况下,我真的希望能够在我从堆栈中提取下一个请求并开始(昂贵的)计算之前检测到 Web 客户端已经消失。不幸的是,我在 snap 方面的经验让我相信在该框架中没有办法询问 "is the client's TCP connection still connected?" 并且我还没有找到涵盖 "client disconnected" 案例的其他 Web 框架的任何文档。
那么有没有Haskell web 框架可以很容易地检测到web 客户端是否已断开连接?或者做不到这一点,有没有至少可以做到这一点的方法?
(我理解可能无法在所有情况下都绝对确定TCP客户端是否还没有向另一端发送数据;但是,当客户端实际向服务器发送RST数据包时,服务器的框架不允许应用程序代码确定连接已断开,这是个问题)
顺便说一句,虽然有人可能怀疑 warp's onClose
处理程序会让你这样做,但只有当响应准备好并写入客户端时才会触发,因此作为一种中止方式是无用的正在进行的计算。似乎也没有办法访问接受的套接字以设置 SO_KEEPALIVE
或类似的。 (有方法可以访问初始侦听套接字,但不能访问接受的套接字)
假设 'web service' 表示基于 HTTP(S) 的客户端,一种选择是使用 RESTful 方法。服务可以接受请求并 return 202 Accepted
,而不是假设客户端将保持连接。正如 HTTP status code specification 概述:
The request has been accepted for processing, but the processing has not been completed [...]
The 202 response is intentionally non-committal. Its purpose is to allow a server to accept a request for some other process (perhaps a batch-oriented process that is only run once per day) without requiring that the user agent's connection to the server persist until the process is completed. The entity returned with this response SHOULD include an indication of the request's current status and either a pointer to a status monitor or some estimate of when the user can expect the request to be fulfilled.
服务器立即响应 202 Accepted
响应,还包括一个 URL 客户端可以用来轮询状态。一种选择是将此 URL 放在响应的 Location
header 中,但您也可以将 URL 放在响应的 body 的 link 中].
客户端可以轮询状态 URL 来获取状态。计算完成后,状态资源可以为完成的结果提供 link。
如果您担心客户端轮询过于频繁,您可以将缓存 header 添加到状态资源和最终结果。
REST in Practice outlines the general concepts, while the RESTful Web Services Cookbook 有很多很好的细节。
我不是说你不能用 HTTP 或 TCP/IP 做某事(我不知道),但如果你不能,那么上面是 tried-and-true 类似问题的解决方法。
显然,这完全独立于编程语言,但根据我的经验,REST and algebraic data types go well together。
所以我找到了一个对我有用的答案,它可能对其他人也有用。
事实证明,您实际上可以充分利用 Warp 的内部结构来执行此操作,但是您剩下的是 Warp 的基本版本,如果您需要诸如日志记录之类的东西,需要在上面添加其他包。
此外,请注意所谓的 "half-closed" 连接(当客户端关闭其发送端,但仍在等待数据时)将被检测为已关闭,从而中断您的计算。我不知道有任何处理半关闭连接的 HTTP 客户端,但需要注意一些事情。
无论如何,我所做的是首先复制 Network.Wai.Handler.Warp
和 Network.Wai.Handler.Warp.Internal
暴露的函数 runSettings
和 runSettingsSocket
并制作调用我提供的函数的版本而不是WarpI.socketConnection
,这样我就有签名了:
runSettings' :: Warp.Settings -> (Socket -> IO (IO WarpI.Connection))
-> Wai.Application -> IO ()
这需要复制一些辅助方法,例如 setSocketCloseOnExec
和 windowsThreadBlockHack
。 double-IO
签名可能看起来很奇怪,但这就是你想要的 - 外部 IO
在主线程(调用 accept
)中是 运行,内部 IO
在 accept
returns 之后分叉的每个连接线程中是 运行。原来的Warp
函数runSettings
等价于:
\set -> runSettings' set (WarpI.socketConnection >=> return . return)
然后我做了:
data ClientDisappeared = ClientDisappeared deriving (Show, Eq, Enum, Ord)
instance Exception ClientDisappeared
runSettingsSignalDisconnect :: Warp.Settings -> Wai.Application -> IO ()
runSettingsSignalDisconnect set =
runSettings' set (WarpI.socketConnection >=> return . wrapConn)
where
-- Fork a 'monitor' thread that does nothing but attempt to
-- perform a read from conn in a loop 1/sec, and wrap the receive
-- methods on conn so that they first consume from the stuff read
-- by the monitoring thread. If the monitoring thread sees
-- end-of-file (signaled by an empty string read), raise
-- ClientDisappered on the per-connection thread.
wrapConn conn = do
tid <- myThreadId
nxtBstr <- newEmptyMVar :: IO (MVar ByteString)
semaphore <- newMVar ()
readerCount <- newIORef (0 :: Int)
monitorThread <- forkIO (monitor tid nxtBstr semaphore readerCount)
return $ conn {
WarpI.connClose = throwTo monitorThread ClientDisappeared
>> WarpI.connClose conn
, WarpI.connRecv = newRecv nxtBstr semaphore readerCount
, WarpI.connRecvBuf = newRecvBuf nxtBstr semaphore readerCount
}
where
newRecv :: MVar ByteString -> MVar () -> IORef Int
-> IO ByteString
newRecv nxtBstr sem readerCount =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do w <- tryTakeMVar nxtBstr
case w of
Just w' -> return w'
Nothing -> WarpI.connRecv conn
)
newRecvBuf :: MVar ByteString -> MVar () -> IORef Int
-> WarpI.Buffer -> WarpI.BufSize -> IO Bool
newRecvBuf nxtBstr sem readerCount buf bufSize =
bracket_
(atomicModifyIORef' readerCount $ \x -> (succ x, ()))
(atomicModifyIORef' readerCount $ \x -> (pred x, ()))
(withMVar sem $ \_ -> do
(fulfilled, buf', bufSize') <-
if bufSize == 0 then return (False, buf, bufSize)
else
do w <- tryTakeMVar nxtBstr
case w of
Nothing -> return (False, buf, bufSize)
Just w' -> do
let wlen = B.length w'
if wlen > bufSize
then do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') bufSize
putMVar nxtBstr (B.drop bufSize w')
return (True, buf, 0)
else do BU.unsafeUseAsCString w' $ \cw' ->
copyBytes buf (castPtr cw') wlen
return (wlen == bufSize, plusPtr buf wlen,
bufSize - wlen)
if fulfilled then return True
else WarpI.connRecvBuf conn buf' bufSize'
)
dropClientDisappeared :: ClientDisappeared -> IO ()
dropClientDisappeared _ = return ()
monitor tid nxtBstr sem st =
catch (monitor' tid nxtBstr sem st) dropClientDisappeared
monitor' tid nxtBstr sem st = do
(hitEOF, readerCount) <- withMVar sem $ \_ -> do
w <- tryTakeMVar nxtBstr
case w of
-- No one picked up our bytestring from last time
Just w' -> putMVar nxtBstr w' >> return (False, 0)
Nothing -> do
w <- WarpI.connRecv conn
putMVar nxtBstr w
readerCount <- readIORef st
return (B.null w, readerCount)
if hitEOF && (readerCount == 0)
-- Don't signal if main thread is also trying to read -
-- in that case, main thread will see EOF directly
then throwTo tid ClientDisappeared
else do threadDelay oneSecondInMicros
monitor' tid nxtBstr sem st
oneSecondInMicros = 1000000