Data.STM.LinkedList 实施

Data.STM.LinkedList implementation

我正在研究 Data.STM.LinkedList 高性能链表的实现。查看文档,O(n) 中的长度函数 运行 - 为什么?在 O(1) 中实现它是否存在任何实际问题?

这是源代码 https://hackage.haskell.org/package/stm-linkedlist-0.1.0.0/docs/src/Data-STM-LinkedList-Internal.html#length

是否可以在 O(1) 中实现它?我是 Haskell 的新手,所以我不确定保存有关列表的一些元数据是否有问题。

谢谢!

初步估计,Haskell 是一种充分表达的语言,任何用另一种通用语言实现的算法也可以在 Haskell 中实现,同时保持渐近性能特征。 (这是一个很低的门槛。大多数通用语言都具有这种表现力。)

特别是,虽然 Haskell 最自然地支持不可变数据结构,但它对可变数据有足够的支持,可变数据结构及其算法通常可以相当直接地转换为 Haskell 代码。可能会有一些开销(通常是大量开销),并且可变数据结构可能比它们的不可变表亲更难使用,但它仍然是可能的。

但是,作为一个实际问题,匹配可变数据结构的 C++ 实现的 实际(而不是渐近)性能可能被证明是极其困难的,如果不是不可能的话.获得 C++ 性能的 2-3 倍以内可能是合理的,而获得 5-10 倍以内则相当容易(见下文)。但是,如果您需要匹配 C++ 的性能,您最好用 C++ 编写高性能变异代码并使用 FFI(外部函数接口)与该代码接口。

无论如何,O(1) length 的 "moderate performance" 双向链表当然是可能的,并且维护可变列表范围的元数据没有根本的困难。 stm-linkedlist 不提供 O(1) length 的原因可能与 C++ 仅保证 O(n) std::list<>::size 性能 before C++11 的原因相同。即,双向链表的许多实际应用永远不需要调用 length/size,并且提供 O(1) 性能会带来额外的簿记成本。

作为概念证明,以下数据类型足以实现具有 O(1) 长度函数的完全可变双向链表。此处,以下划线结尾的类型和标识符仅供内部使用。该列表在其指针方面是严格的(因此没有无限列表!)但在其值方面是惰性的。

data List a = List
  { headNode_ :: !(IORef (Node_ a))
  , length_ :: !(IORef Int) }
data Node_ a = Node_
  { prev_ :: !(IORef (Node_ a))
  , next_ :: !(IORef (Node_ a))
  , value_ :: a }

List 类型包含一个指向不完整 headNode 的指针(即 IORef),该 headNode 指向列表的开头和结尾(对于空列表则指向自身) ) 但有一个未定义的值字段。这使得它成为一个不安全的节点值,因此最终用户永远不能直接访问它。 List 还包含指向列表长度值的指针。

一个额外的类型Node(无下划线)用于装饰一个节点指针及其相应的列表(如注释中的"iterator"),使列表元数据可用于函数需要它:

data Node a = Node
  { node_ :: !(IORef (Node_ a))
  , list_ :: !(List a) }

请注意,ListNode 是用于处理列表的面向用户的数据类型。

您创建一个 empty 列表,如下所示:

empty :: IO (List a)
empty = mdo
  n <- newIORef (Node_ n n undefined)
  List n <$> newIORef 0

给定节点前后的插入工作如下。这是不安全的头节点表示得到回报的地方,因为该算法可以将列表开头和结尾的插入视为头节点和实际列表节点之间插入的特殊情况。

insertBefore :: a -> Node a -> IO (Node a)
insertBefore x Node{node_=rnode2, list_} = do
  Node_{prev_=rnode1} <- readIORef rnode2
  insertBetween_ x list_ rnode1 rnode2

insertAfter :: a -> Node a -> IO (Node a)
insertAfter x Node{node_=rnode1, list_} = do
  Node_{next_=rnode2} <- readIORef rnode1
  insertBetween_ x list_ rnode1 rnode2

insertBetween_ :: a -> List a -> IORef (Node_ a) -> IORef (Node_ a) -> IO (Node a)
insertBetween_ x l rnode1 rnode2 = do
  modifyIORef' (length_ l) succ
  newnode <- newIORef (Node_ rnode1 rnode2 x)
  modifyIORef' rnode1 (\n -> n{next_=newnode})
  modifyIORef' rnode2 (\n -> n{prev_=newnode})
  return $ Node newnode l

由于不允许用户 "have" 头节点,我们需要在列表的开头和结尾插入额外的面向用户的函数:

prepend :: a -> List a -> IO (Node a)
prepend x l = insertAfter x (Node (headNode_ l) l)

append :: a -> List a -> IO (Node a)
append x l = insertBefore x (Node (headNode_ l) l)

观察所有插入都经过 insertBetween_,它负责增加长度值。

无论是内部节点还是开头或结尾的节点,删除都是简单而统一的。所有删除都通过这个 delete 函数,该函数负责减少长度值。

delete :: Node a -> IO ()
delete Node{node_,list_} = do
  modifyIORef' (length_ list_) pred
  Node_{next_, prev_} <- readIORef node_
  modifyIORef' prev_ (\n -> n{next_=next_})
  modifyIORef' next_ (\n -> n{prev_=prev_})

删除头节点将是一场灾难,但不允许用户拥有这样的Node,所以我们很安全。

如果用户有 Node,她可以在列表中来回移动:

prev :: Node a -> IO (Maybe (Node a))
prev Node{node_, list_} = do
  Node_{prev_} <- readIORef node_
  return $ maybeNode_ prev_ list_

next :: Node a -> IO (Maybe (Node a))
next Node{node_, list_} = do
  Node_{next_} <- readIORef node_
  return $ maybeNode_ next_ list_

maybeNode_ :: IORef (Node_ a) -> List a -> Maybe (Node a)
maybeNode_ n l =
  if n == headNode_ l
  then Nothing
  else Just (Node n l)

请注意,我们必须注意永远不要给用户头节点,所以 maybeNode_ 在这里检查它,returns Nothing 代替。

首先,用户可以使用以下函数(在禁止的头节点上使用 prevnext)获取 List 的开始或结束:

start :: List a -> IO (Maybe (Node a))
start l = next $ Node (headNode_ l) l

end :: List a -> IO (Maybe (Node a))
end l = prev $ Node (headNode_ l) l

所缺少的只是一些杂项查询功能:

value :: Node a -> IO a
value = fmap value_ . readIORef . node_

null :: List a -> IO Bool
null l = (==0) <$> length l

length :: List a -> IO Int
length = readIORef . length_

一些可转换为普通列表的实用程序:

toList :: List a -> IO [a]
toList = toList_ next_

toListRev :: List a -> IO [a]
toListRev = toList_ prev_

toList_ :: (Node_ a -> IORef (Node_ a)) -> List a -> IO [a]
toList_ dir l = go =<< readIORef h
  where h = headNode_ l
        go n = do
          if dir n == h then return []
            else do
            n' <- readIORef (dir n)
            (value_ n':) <$> go n'

和一个 Show 调试实例:

instance (Show a) => Show (List a) where
  showsPrec d lst = showParen (d > 10) $ showString "fromList " . showsPrec 11 (unsafePerformIO $ toList lst)

警告:如果列表在生成的字符串被完全评估之前发生变异,这个Show实例是不安全的,所以它应该只用于调试(并且可能从生产版本中删除)。

此外,虽然这不是绝对必要的,因为我们可以删除和重新插入,但如果不对元素进行就地修改,任何自尊的可变结构都是不完整的:

modify :: (a -> a) -> Node a -> IO ()
modify f Node{node_} = modifyIORef' node_ (\n -> n { value_ = f (value_ n) })

这是完整的代码。 (有关示例用法,请参阅定义 ex1。)欢迎您将其用作您自己实现的起点。它未经测试和基准测试,除了一些快速测试表明它可能比 C++ 实现慢 5-10 倍。

{-# LANGUAGE NamedFieldPuns, RecursiveDo #-}

module LinkedList
  ( List, Node
  , value, null, length
  , empty, prepend, append, insertBefore, insertAfter, delete, modify
  , prev, next, start, end
  , toList, toListRev
  ) where

import System.IO.Unsafe
import Control.Monad
import Prelude hiding (null, length)

import Data.IORef

data List a = List
  { headNode_ :: !(IORef (Node_ a))
  , length_ :: !(IORef Int) }
data Node a = Node
  { node_ :: !(IORef (Node_ a))
  , list_ :: !(List a) }
data Node_ a = Node_
  { prev_ :: !(IORef (Node_ a))
  , next_ :: !(IORef (Node_ a))
  , value_ :: a }

-- unsafe show instance: remove from production version
instance (Show a) => Show (List a) where
  showsPrec d lst = showParen (d > 10) $ showString "fromList " . showsPrec 11 (unsafePerformIO $ toList lst)

value :: Node a -> IO a
value = fmap value_ . readIORef . node_

null :: List a -> IO Bool
null l = (==0) <$> length l

length :: List a -> IO Int
length = readIORef . length_

empty :: IO (List a)
empty = mdo
  n <- newIORef (Node_ n n undefined)
  List n <$> newIORef 0

prepend :: a -> List a -> IO (Node a)
prepend x l = insertAfter x (Node (headNode_ l) l)

append :: a -> List a -> IO (Node a)
append x l = insertBefore x (Node (headNode_ l) l)

insertBefore :: a -> Node a -> IO (Node a)
insertBefore x Node{node_=rnode2, list_} = do
  Node_{prev_=rnode1} <- readIORef rnode2
  insertBetween_ x list_ rnode1 rnode2

insertAfter :: a -> Node a -> IO (Node a)
insertAfter x Node{node_=rnode1, list_} = do
  Node_{next_=rnode2} <- readIORef rnode1
  insertBetween_ x list_ rnode1 rnode2

insertBetween_ :: a -> List a -> IORef (Node_ a) -> IORef (Node_ a) -> IO (Node a)
insertBetween_ x l rnode1 rnode2 = do
  modifyIORef' (length_ l) succ
  newnode <- newIORef (Node_ rnode1 rnode2 x)
  modifyIORef' rnode1 (\n -> n{next_=newnode})
  modifyIORef' rnode2 (\n -> n{prev_=newnode})
  return $ Node newnode l

delete :: Node a -> IO ()
delete Node{node_,list_} = do
  modifyIORef' (length_ list_) pred
  Node_{next_, prev_} <- readIORef node_
  modifyIORef' prev_ (\n -> n{next_=next_})
  modifyIORef' next_ (\n -> n{prev_=prev_})

modify :: (a -> a) -> Node a -> IO ()
modify f Node{node_} = modifyIORef' node_ (\n -> n { value_ = f (value_ n) })

prev :: Node a -> IO (Maybe (Node a))
prev Node{node_, list_} = do
  Node_{prev_} <- readIORef node_
  return $ maybeNode_ prev_ list_

next :: Node a -> IO (Maybe (Node a))
next Node{node_, list_} = do
  Node_{next_} <- readIORef node_
  return $ maybeNode_ next_ list_

maybeNode_ :: IORef (Node_ a) -> List a -> Maybe (Node a)
maybeNode_ n l =
  if n == headNode_ l
  then Nothing
  else Just (Node n l)

start :: List a -> IO (Maybe (Node a))
start l = next $ Node (headNode_ l) l

end :: List a -> IO (Maybe (Node a))
end l = prev $ Node (headNode_ l) l

toList :: List a -> IO [a]
toList = toList_ next_

toListRev :: List a -> IO [a]
toListRev = toList_ prev_

toList_ :: (Node_ a -> IORef (Node_ a)) -> List a -> IO [a]
toList_ dir l = go =<< readIORef h
  where h = headNode_ l
        go n = do
          if dir n == h then return []
            else do
            n' <- readIORef (dir n)
            (value_ n':) <$> go n'

ex1 :: IO (List Int)
ex1 = do
  t <- empty
  mapM_ (flip prepend t) [10,9..1]
  mapM_ (flip append t) [11..20]
  return t