我应该如何使用 Scotty (Haskell) 处理 long-running HTTP 请求?

How should I handle a long-running HTTP request, using Scotty (Haskell)?

我正在制作 a simple web app that looks for color words in a text, and plots statistics about them. You can test it at colors.jonreeve.com if it's not too busy. I'm using the Scotty 网络框架来处理网络内容。它适用于短文本,但较长的文本,如完整的小说,需要很长时间以致于浏览器通常会超时。所以我猜我在这里需要的是通过 Jquery AJAX 或其他方式发送表单,然后让服务器每隔一段时间发送 JSON 及其状态("now loading file," "now counting colors," 等)然后当它接收到 "success" 信号时,然后重定向到其他 URL?

这是我第一次尝试做这样的事情,所以如果这听起来很不了解,请原谅我。我还注意到那里有一些类似的问题,但我感觉 Scotty 处理事情的方式与大多数设置略有不同。我注意到有 a few functions 用于设置原始输出,设置 headers 等等。我是否尝试在分析的每个阶段发出某些信号?考虑到 Haskell 对 side-effects 的处理方式,我该怎么做?我什至在这里都在努力想出最好的方法。

我可能会设置一个接受 POST 请求的端点,而不是单个长 运行 GET 请求。 POST 会立即 return 并在响应正文中包含两个 link:

  • 一个 link 到表示任务结果的新资源,该资源不会立即可用。在那之前,对结果的 GET 请求可以 return 409 (Conflict).

  • 一个 link 到相关的、立即可用的资源,代表执行任务时发出的通知。

一旦客户端成功获取了任务结果资源,就可以删除它。那应该同时删除任务结果资源和关联的通知资源。

对于每个 POST 请求,您需要 spawn a background worker thread. You would also need a background thread for deleting task results that grew old (because the clients could be lazy and not invoke DELETE). These threads would communicate with MVars, TVars, channels or similar methods.

现在的问题是:如何最好地处理服务器发出的通知?有several options:

  • 只需定期从客户端轮询通知资源即可。缺点:可能HTTP请求较多,通知接收不及时
  • long polling。 GET 请求的 序列 保持打开状态 直到服务器想要发出一些通知,或直到超时。
  • server-sent events. wai-extra has support for this, but I don't know how to hook a raw wai Application 回到 Scotty。
  • websockets。虽然不确定如何与 Scotty 集成。

这是长轮询机制的服务器端框架。一些初步进口:

{-# LANGUAGE NumDecimals #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeApplications #-}
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (concurrently_) -- from async
import Control.Concurrent.STM -- from stm
import Control.Concurrent.STM.TMChan -- from stm-chans
import Control.Monad.IO.Class (liftIO)
import Data.Aeson (ToJSON) -- from aeson
import Data.Foldable (for_)
import Data.Text (Text) 
import Web.Scotty

这是主要代码。

main :: IO ()
main =
  do
    chan <- atomically $ newTMChan @Text
    concurrently_
      ( do
          for_
            ["starting", "working on it", "finishing"]
            ( \msg -> do
                threadDelay 10e6
                atomically $ writeTMChan chan msg
            )
          atomically $ closeTMChan chan
      )
      ( scotty 3000
          $ get "/notifications"
          $ do
            mmsg <- liftIO $ atomically $ readTMChan chan
            json $
              case mmsg of
                Nothing -> ["closed!"]
                Just msg -> [msg]
      )

有两个 concurrent threads. One feeds messages into a closeable channel at 10 second intervals,另一个运行 Scotty 服务器,每个 GET 调用都会挂起,直到新消息到达通道。

使用 curl 从 bash 对其进行测试,我们应该会看到一连串的消息:

bash$ for run in {1..4}; do curl -s localhost:3000/notifications ; done
["starting"]["working on it"]["finishing"]["closed!"]

为了进行比较,下面是基于 server-sent events. It uses yesod instead of scotty though, because Yesod offers a way to hook as a handler the wai-extra Application 管理事件的解决方案的框架。

Haskell代码

{-# LANGUAGE NumDecimals #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}

import Control.Concurrent (threadDelay)
import Control.Concurrent.Async (concurrently_) -- from async
import Control.Concurrent.STM -- from stm
import Control.Concurrent.STM.TMChan -- from stm-chans
import Control.Monad.IO.Class (liftIO)
import Data.Binary.Builder -- from binary
import Data.Foldable (for_)
import Network.Wai.EventSource -- from wai-extra
import Network.Wai.Middleware.AddHeaders -- from wai-extra
import Yesod -- from yesod

data HelloWorld = HelloWorld (TMChan ServerEvent)

mkYesod
  "HelloWorld"
  [parseRoutes|
/foo FooR GET
|]

instance Yesod HelloWorld

getFooR :: Handler ()
getFooR = do
  HelloWorld chan <- getYesod
  sendWaiApplication
    . addHeaders [("Access-Control-Allow-Origin", "*")]
    . eventStreamAppRaw
    $ \send flush ->
      let go = do
            mevent <- liftIO $ atomically $ readTMChan chan
            case mevent of
              Nothing -> do
                send CloseEvent
                flush
              Just event -> do
                send event
                flush
                go
       in go

main :: IO ()
main =
  do
    chan <- atomically $ newTMChan
    concurrently_
      ( do
          for_
            [ ServerEvent
                (Just (fromByteString "ev"))
                (Just (fromByteString "id1"))
                [fromByteString "payload1"],
              ServerEvent
                (Just (fromByteString "ev"))
                (Just (fromByteString "id2"))
                [fromByteString "payload2"],
              ServerEvent
                (Just (fromByteString "ev"))
                (Just (fromByteString "eof"))
                [fromByteString "payload3"]
            ]
            ( \msg -> do
                threadDelay 10e6
                atomically $ writeTMChan chan msg
            )
          atomically $ closeTMChan chan
      )
      ( warp 3000 (HelloWorld chan)
      )

还有一个小的空白页面来测试服务器发送的事件。消息出现在浏览器上 console:

<!DOCTYPE html>
<html lang="en">
<body>
</body>
<script>
    window.onload = function() {
        var source = new EventSource('http://localhost:3000/foo'); 
        source.onopen = function () { console.log('opened'); }; 
        source.onerror = function (e) { console.error(e); }; 
        source.addEventListener('ev', (e) => {
            console.log(e);
            if (e.lastEventId === 'eof') {
                source.close();
            }
        });
    }
</script>
</html>