异步和 TBqueue
Async and TBqueue
我有几千个无法同时处理的输出文件,因此我想要一个一次处理一大块 n 个文件的函数。所以我决定使用 TBQueue。
实现的想法是最初用 n 虚拟值填充队列,然后循环尝试读取队列中的下一个虚拟值。如果队列中有值,则会执行 IO 操作,并且当 IO 操作完成时,会将新值添加到队列中。否则 readTBQueue 将阻塞,直到其中一个进程完成(至少我希望如此)。
那么我的问题是:
1.当没有更多的文件需要处理时,主线程是否要等到children全部处理完?
2. 如果一个async crash 会怎样?虚拟值是否要写入队列?
processFiles :: Int -> [FilePath] -> (FilePath -> IO ()) -> IO ()
processFiles n fs fun = do
tbQ <- atomically $ newTBQueue n
atomically $ replicateM_ n $ writeTBQueue tbQ ()
loop fs tbQ
where loop :: [FilePath] -> TBQueue () -> IO ()
loop files queue | null files = return ()
| otherwise = do
join . atomically $ do
readTBQueue queue
let file = head files
return $ withAsync (fun file) $ \a -> do
wait a
atomically $ writeTBQueue queue ()
loop (tail files) queue
根据 MathematicalOrchid 的建议(谢谢!),我编写了一个新的实现
processFiles :: Int -> [FilePath] -> (FilePath -> IO ()) -> IO ()
processFiles n fs fun = do
tbQ <- atomically $ newTBQueue n
loop fs tbQ
where loop :: [FilePath] -> TBQueue FilePath -> IO ()
loop files queue | null files = return ()
| otherwise = do
join . atomically $ do
writeTBQueue queue (head files)
let actionSTM = atomically $ readTBQueue queue
return $ withAsync actionSTM $ \a -> do
file <- wait a
async (fun file) >>= doSomethingOnException
loop (tail files) queue
doSomethingOnException :: Async () -> IO ()
doSomethingOnException a = do
r <- waitCatch a
case r of
Left exception -> undefined
Right _ -> return ()
但我仍然不确定循环函数 returns 是否必须等待挂起的作业。
您在这里似乎有两个不同的问题:同步和可靠性。
STM 就是让多个线程访问可变数据而不破坏它。 TBQueue
应该处理得很好。如果您希望 "crashed" 操作重新启动...您需要为此构建额外的基础设施。
为什么用 "dummy values" 填充队列而不是要处理的实际文件名是否有特定原因?如果是我,主要威胁的工作就是用文件名填充队列(当队列太满时,主线程将在工作线程执行工作时被阻塞)。如果您想从 "crashed" 个线程中恢复,则每个线程的每个工作人员的顶级代码都会捕获异常并重试操作或其他操作。或者,我就是这样做的...
我有几千个无法同时处理的输出文件,因此我想要一个一次处理一大块 n 个文件的函数。所以我决定使用 TBQueue。
实现的想法是最初用 n 虚拟值填充队列,然后循环尝试读取队列中的下一个虚拟值。如果队列中有值,则会执行 IO 操作,并且当 IO 操作完成时,会将新值添加到队列中。否则 readTBQueue 将阻塞,直到其中一个进程完成(至少我希望如此)。
那么我的问题是: 1.当没有更多的文件需要处理时,主线程是否要等到children全部处理完? 2. 如果一个async crash 会怎样?虚拟值是否要写入队列?
processFiles :: Int -> [FilePath] -> (FilePath -> IO ()) -> IO ()
processFiles n fs fun = do
tbQ <- atomically $ newTBQueue n
atomically $ replicateM_ n $ writeTBQueue tbQ ()
loop fs tbQ
where loop :: [FilePath] -> TBQueue () -> IO ()
loop files queue | null files = return ()
| otherwise = do
join . atomically $ do
readTBQueue queue
let file = head files
return $ withAsync (fun file) $ \a -> do
wait a
atomically $ writeTBQueue queue ()
loop (tail files) queue
根据 MathematicalOrchid 的建议(谢谢!),我编写了一个新的实现
processFiles :: Int -> [FilePath] -> (FilePath -> IO ()) -> IO ()
processFiles n fs fun = do
tbQ <- atomically $ newTBQueue n
loop fs tbQ
where loop :: [FilePath] -> TBQueue FilePath -> IO ()
loop files queue | null files = return ()
| otherwise = do
join . atomically $ do
writeTBQueue queue (head files)
let actionSTM = atomically $ readTBQueue queue
return $ withAsync actionSTM $ \a -> do
file <- wait a
async (fun file) >>= doSomethingOnException
loop (tail files) queue
doSomethingOnException :: Async () -> IO ()
doSomethingOnException a = do
r <- waitCatch a
case r of
Left exception -> undefined
Right _ -> return ()
但我仍然不确定循环函数 returns 是否必须等待挂起的作业。
您在这里似乎有两个不同的问题:同步和可靠性。
STM 就是让多个线程访问可变数据而不破坏它。 TBQueue
应该处理得很好。如果您希望 "crashed" 操作重新启动...您需要为此构建额外的基础设施。
为什么用 "dummy values" 填充队列而不是要处理的实际文件名是否有特定原因?如果是我,主要威胁的工作就是用文件名填充队列(当队列太满时,主线程将在工作线程执行工作时被阻塞)。如果您想从 "crashed" 个线程中恢复,则每个线程的每个工作人员的顶级代码都会捕获异常并重试操作或其他操作。或者,我就是这样做的...