F# 中的尾递归:堆栈溢出

Tail Recursion in F# : Stack Overflow

我正在尝试在大图上实现 Kosaraju 算法 作为作业的一部分 [MOOC Algo I Stanford on Coursera]

https://en.wikipedia.org/wiki/Kosaraju%27s_algorithm

当前代码在一个小图上运行,但我在运行时执行期间遇到了 Stack Overflow。

尽管阅读了 Expert in F# 中的相关章节,或网站和 SO 上的其他可用示例,但我仍然不明白如何使用延续来解决此问题

下面是通用的完整代码,但是在执行DFSLoop1和里面的递归函数DFSsub时已经失败了。我想我没有使函数尾部递归[因为说明

t<-t+1
G.[n].finishingtime <- t

?]

但我不明白如何才能正确实施延续。

当只考虑失败的部分时,DFSLoop1 将我们将应用深度优先搜索的图作为参数。我们需要记录完成时间作为算法的一部分,以便在第二个 DFS 循环 (DFSLoop2) 中继续算法的第二部分 [当然我们在此之前就失败了]。

open System
open System.Collections.Generic
open System.IO

let x = File.ReadAllLines "C:\Users\Fagui\Documents\GitHub\Learning Fsharp\Algo Stanford I\PA 4 - SCC.txt";;
// let x = File.ReadAllLines "C:\Users\Fagui\Documents\GitHub\Learning Fsharp\Algo Stanford I\PA 4 - test1.txt";;
// val x : string [] =

let splitAtTab (text:string)=
    text.Split [|'\t';' '|]

let splitIntoKeyValue (A: int[]) = 
    (A.[0], A.[1])

let parseLine (line:string)=
    line
    |> splitAtTab
    |> Array.filter (fun s -> not(s=""))
    |> Array.map (fun s-> (int s))
    |> splitIntoKeyValue

let y =
    x |> Array.map parseLine
 //val it : (int * int) [] 

type Children = int[]
type Node1 =  
     {children : Children ;
      mutable finishingtime : int ;
      mutable explored1 : bool ; 
      }

type Node2 = 
     {children : Children ;
      mutable leader : int ;
      mutable explored2 : bool ; 
      }

type DFSgraphcore    = Dictionary<int,Children>
let directgraphcore  = new DFSgraphcore()
let reversegraphcore = new DFSgraphcore()

type DFSgraph1    = Dictionary<int,Node1>
let reversegraph1 = new DFSgraph1()

type DFSgraph2    = Dictionary<int,Node2>
let directgraph2  = new DFSgraph2()

let AddtoGraph (G:DFSgraphcore) (n,c) = 
    if not(G.ContainsKey n) then 
                              let node = [|c|]
                              G.Add(n,node)
                            else
                               let c'= G.[n]
                               G.Remove(n) |> ignore
                               G.Add (n, Array.append c' [|c|])

let inline swaptuple (a,b) = (b,a)
y|> Array.iter (AddtoGraph directgraphcore)
y|> Array.map swaptuple |> Array.iter (AddtoGraph reversegraphcore)

for i in directgraphcore.Keys do
    if reversegraphcore.ContainsKey(i) then do

               let node = {children = reversegraphcore.[i] ;
                           finishingtime = -1 ;
                           explored1 = false ;
                           }
               reversegraph1.Add (i,node)

        else                                   
               let node = {children = [||] ;
                           finishingtime = -1 ;
                           explored1 = false ;
                           }
               reversegraph1.Add (i,node)

directgraphcore.Clear  |> ignore
reversegraphcore.Clear |> ignore

// for i in reversegraph1.Keys do printfn "%d %A" i reversegraph1.[i].children
printfn "pause"
Console.ReadKey() |> ignore

let num_nodes =
    directgraphcore |> Seq.length


let DFSLoop1 (G:DFSgraph1)  = 
     let mutable t = 0
     let mutable s = -1
     let mutable k = num_nodes

     let rec DFSsub (G:DFSgraph1)(n:int) (cont:int->int) =
     //how to make it tail recursive ???

          G.[n].explored1 <- true
          // G.[n].leader <- s
          for j in G.[n].children do
                       if not(G.[j].explored1) then DFSsub G j cont
          t<-t+1
          G.[n].finishingtime <- t  

     // end of DFSsub

     for i in num_nodes .. -1 .. 1 do
        printfn "%d" i
        if not(G.[i].explored1) then do 
                                    s <- i
                                    ( DFSsub G i (fun s -> s) ) |> ignore
     //   printfn "%d %d" i G.[i].finishingtime

DFSLoop1 reversegraph1

printfn "pause"
Console.ReadKey() |> ignore

for i in directgraphcore.Keys do
    let node = {children = 
                       directgraphcore.[i]
                       |> Array.map (fun k -> reversegraph1.[k].finishingtime)  ;
                leader = -1 ;
                explored2= false ;
                }
    directgraph2.Add (reversegraph1.[i].finishingtime,node)

let z = 0

let DFSLoop2 (G:DFSgraph2)  = 
     let mutable t = 0
     let mutable s = -1
     let mutable k = num_nodes

     let rec DFSsub (G:DFSgraph2)(n:int) (cont:int->int) =

          G.[n].explored2 <- true
          G.[n].leader <- s
          for j in G.[n].children do
                       if not(G.[j].explored2) then DFSsub G j cont
          t<-t+1
          // G.[n].finishingtime <- t  

     // end of DFSsub

     for i in num_nodes .. -1 .. 1 do
        if not(G.[i].explored2) then do 
                                    s <- i
                                    ( DFSsub G i (fun s -> s) ) |> ignore
       // printfn "%d %d" i G.[i].leader

DFSLoop2 directgraph2

printfn "pause"
Console.ReadKey() |> ignore


let table = [for i in directgraph2.Keys do yield directgraph2.[i].leader]
let results = table |> Seq.countBy id |> Seq.map snd |> Seq.toList |> List.sort |> List.rev
printfn "%A" results

printfn "pause"
Console.ReadKey() |> ignore

这是一个带有简单图形示例的文本文件

1 4
2 8
3 6
4 7
5 2
6 9
7 1
8 5
8 6
9 7
9 3

(导致溢出的是 70Mo,大约有 900,000 个节点)

编辑

首先澄清一些事情 这是 "pseudo code"

输入:有向图 G = (V,E),以邻接表表示。假设顶点 V 被标记为 1, 2, 3, 。 . . , 名词 1.令Grev表示所有弧的方向都反转后的图G。 2. 运行Grev上的DFS-Loop子程序,按照给定的顺序处理顶点,得到一个 每个顶点 v ∈ V 的完成时间 f(v)。 3. 运行 G上的DFS-Loop子程序,按照f(v)的降序处理顶点,分配一个leader 到每个顶点 v ∈ V 。 4. G 的强连通分量对应于共享一个共同领导者的 G 的顶点。 图 2:我们的 SCC 算法的顶层。 f 值和领导者在第一个和第二个中计算 分别调用 DFS-Loop(见下文)。

输入:有向图 G = (V,E),以邻接表表示。 1. 初始化一个全局变量t为0。 [这会跟踪已完全探索的顶点数。] 2. 初始化一个全局变量s为NULL。 [这跟踪调用最后一个 DFS 调用的顶点。] 3. 对于 i = n 降为 1: [在第一次调用中,顶点被标记为 1, 2, . . . , n 任意。在第二次调用中,顶点被标记为 他们第一次通话的 f(v) 值。] (a) 如果我还没有探索: 一世。设置 s := i 二. DFS(G, i) 图 3:DFS 循环子例程。

输入:有向图 G = (V,E),以邻接表表示,源顶点 i ∈ V。 1. 将 i 标记为已探索。 [它在 DFS-Loop 调用的整个过程中一直处于探索状态。] 2. 设置 leader(i) := s 3. 对于每个弧 (i, j) ∈ G: (a) 如果 j 尚未探索: 一世。 DFS(G, j) 4.t++ 5. 设置 f(i) := t 图 4:DFS 子程序。 f 值只需要在第一次调用 DFS-Loop 时计算,并且 leader 值只需要在第二次调用 DFS-Loop 时计算。

编辑 我已经修改了代码,在一位经验丰富的程序员(一个 lisper 但没有 F# 经验)的帮助下简化了第一部分,以便更快地获得一个示例,而不用担心与本次讨论无关的代码。

代码只关注算法的一半,运行 DFS 一次得到反向树的完成时间。

这是代码的第一部分,只是为了创建一个小示例 y 是原始树。元组的第一个元素是父元素,第二个元素是子元素。但是我们将使用反向树

open System
open System.Collections.Generic
open System.IO

let x = File.ReadAllLines "C:\Users\Fagui\Documents\GitHub\Learning Fsharp\Algo Stanford I\PA 4 - SCC.txt";;
// let x = File.ReadAllLines "C:\Users\Fagui\Documents\GitHub\Learning Fsharp\Algo Stanford I\PA 4 - test1.txt";;
// val x : string [] =

let splitAtTab (text:string)=
    text.Split [|'\t';' '|]

let splitIntoKeyValue (A: int[]) = 
    (A.[0], A.[1])

let parseLine (line:string)=
    line
    |> splitAtTab
    |> Array.filter (fun s -> not(s=""))
    |> Array.map (fun s-> (int s))
    |> splitIntoKeyValue

// let y =
//    x |> Array.map parseLine

//let y =
//   [|(1, 4); (2, 8); (3, 6); (4, 7); (5, 2); (6, 9); (7, 1); (8, 5); (8, 6);
//    (9, 7); (9, 3)|]

// let y = Array.append [|(1,1);(1,2);(2,3);(3,1)|] [|for i in 4 .. 10000 do yield (i,4)|] 
let y = Array.append [|(1,1);(1,2);(2,3);(3,1)|] [|for i in 4 .. 99999 do yield (i,i+1)|] 



 //val it : (int * int) [] 

type Children = int list
type Node1 =  
     {children : Children ;
      mutable finishingtime : int ;
      mutable explored1 : bool ; 
      }

type Node2 = 
     {children : Children ;
      mutable leader : int ;
      mutable explored2 : bool ; 
      }

type DFSgraphcore    = Dictionary<int,Children>
let directgraphcore  = new DFSgraphcore()
let reversegraphcore = new DFSgraphcore()

type DFSgraph1    = Dictionary<int,Node1>
let reversegraph1 = new DFSgraph1()

let AddtoGraph (G:DFSgraphcore) (n,c) = 
    if not(G.ContainsKey n) then 
                              let node = [c]
                              G.Add(n,node)
                            else
                               let c'= G.[n]
                               G.Remove(n) |> ignore
                               G.Add (n, List.append c' [c])

let inline swaptuple (a,b) = (b,a)
y|> Array.iter (AddtoGraph directgraphcore)
y|> Array.map swaptuple |> Array.iter (AddtoGraph reversegraphcore)

// définir reversegraph1 = ... with....
for i in reversegraphcore.Keys do
    let node = {children = reversegraphcore.[i] ;
                           finishingtime = -1 ;
                           explored1 = false ;
                           }
    reversegraph1.Add (i,node)

for i in directgraphcore.Keys do
    if not(reversegraphcore.ContainsKey(i)) then do                                 
               let node = {children = [] ;
                           finishingtime = -1 ;
                           explored1 = false ;
                           }
               reversegraph1.Add (i,node)

directgraphcore.Clear  |> ignore
reversegraphcore.Clear |> ignore

// for i in reversegraph1.Keys do printfn "%d %A" i reversegraph1.[i].children
printfn "pause"
Console.ReadKey() |> ignore

let num_nodes =
    directgraphcore |> Seq.length

所以基本上图形是 (1->2->3->1)::(4->5->6->7->8->....->99999->10000 ) 并且反向图是(1->3->2->1)::(10000->9999->....->4)

这里是直接写的主要代码

//////////////////// main code is below ///////////////////

let DFSLoop1 (G:DFSgraph1)  = 
     let mutable t =  0 
     let mutable s =  -1

     let rec iter (n:int) (f:'a->unit) (list:'a list) : unit = 
         match list with 
            | [] -> (t <- t+1) ; (G.[n].finishingtime <- t)
            | x::xs -> f x ; iter n f xs      
     let rec DFSsub (G:DFSgraph1) (n:int) : unit =  
          let my_f (j:int) : unit = if not(G.[j].explored1) then (DFSsub G j) 
          G.[n].explored1 <- true         
          iter n my_f G.[n].children 

     for i in num_nodes .. -1 .. 1 do
        // printfn "%d" i
        if not(G.[i].explored1) then do 
                                    s <- i
                                    DFSsub G i                                                         

        printfn "%d %d" i G.[i].finishingtime

// End of DFSLoop1


DFSLoop1 reversegraph1

printfn "pause"
Console.ReadKey() |> ignore

它不是尾递归,所以我们使用延续,这里是适应 CPS 风格的相同代码:

//////////////////// main code is below ///////////////////
let DFSLoop1 (G:DFSgraph1)  = 
     let mutable t =  0 
     let mutable s =  -1

     let rec iter_c (n:int) (f_c:'a->(unit->'r)->'r) (list:'a list) (cont: unit->'r) : 'r = 
         match list with 
            | [] -> (t <- t+1) ; (G.[n].finishingtime <- t) ; cont()
            | x::xs -> f_c x (fun ()-> iter_c n f_c xs cont)
     let rec DFSsub (G:DFSgraph1) (n:int) (cont: unit->'r) : 'r=  
          let my_f_c (j:int)(cont:unit->'r):'r = if not(G.[j].explored1) then (DFSsub G j cont) else cont()
          G.[n].explored1 <- true         
          iter_c n my_f_c G.[n].children cont


     for i in maxnum_nodes .. -1 .. 1 do
       // printfn "%d" i
        if not(G.[i].explored1) then do 
                                    s <- i
                                    DFSsub G i id                                                         

        printfn "%d %d" i G.[i].finishingtime


DFSLoop1 reversegraph1
printfn "faré"
printfn "pause"
Console.ReadKey() |> ignore

对于小示例(评论中的那个)或我们正在使用的同一棵树,两种代码都可以编译并给出相同的结果,但大小更小(1000 而不是 100000)

所以我不认为这是算法中的错误,我们有相同的树结构,只是更大的树导致了问题。在我们看来,续篇写得很好。我们已经明确地输入了代码。在所有情况下,所有呼叫都以延续结束...

我们正在寻求专家的建议!!!谢谢!!!

我没有试图理解整个代码片段,因为它相当长,但您肯定需要用使用连续传递样式实现的迭代替换 for 循环。类似于:

let rec iterc f cont list =
  match list with 
  | [] -> cont ()
  | x::xs -> f x (fun () -> iterc f cont xs)

我不明白 cont 在你的 DFSub 函数中的用途(它从未被调用,是吗?),但基于延续的版本大致如下所示:

let rec DFSsub (G:DFSgraph2)(n:int) cont =
  G.[n].explored2 <- true
  G.[n].leader <- s
  G.[n].children 
  |> iterc 
      (fun j cont -> if not(G.[j].explored2) then DFSsub G j cont else cont ()) 
      (fun () -> t <- t + 1)

递归遍历数十万个条目时堆栈溢出一点也不坏,真的。许多编程语言的实现会因为比这短得多的递归而窒息。您遇到了严重的程序员问题 — 没什么好羞愧的!

现在,如果您想进行比您的实现能够处理的更深层次的递归,您需要转换您的算法,使其迭代 and/or tail-recursive(两者是同构的——除了 tail-recursion 允许去中心化和模块化,而迭代是集中的 non-modular).

要将算法从递归转换为 tail-recursive,这是一项重要的技能,您需要了解隐式存储在堆栈帧中的状态,即函数体中的那些自由变量在递归中进行更改,并将它们显式存储在 FIFO 队列中(一种复制堆栈的数据结构,可以简单地实现为链表)。然后,您可以将该具体化框架变量的链表作为参数传递给尾递归函数。

在更高级的情况下,您有许多尾递归函数,每个函数都有不同类型的帧,而不是简单的 self-recursion,您可能需要为具体化堆栈帧定义一些相互递归的数据类型,而不是使用列表。但我相信Kosaraju的算法只涉及self-recursive个函数。

好的,所以上面给出的代码是正确的代码! 问题在于 F#

的编译器

这里是微软的一些说法 http://blogs.msdn.com/b/fsharpteam/archive/2011/07/08/tail-calls-in-fsharp.aspx

基本上,注意设置,在默认模式下,编译器可能不会自动进行尾调用。为此,在 VS2015 中,转到解决方案资源管理器,用鼠标右键单击并单击 "Properties"(滚动列表的最后一个元素) 然后在新的 window 中,点击 "Build" 并勾选方框 "Generate tail calls"

这也是为了检查编译器是否完成了使用反汇编的工作 ILDASM.exe

您可以在我的 github 存储库中找到整个算法的源代码

https://github.com/FaguiCurtain/Learning-Fsharp/blob/master/Algo%20Stanford/Algo%20Stanford/Kosaraju_cont.fs

从性能的角度来看,我不是很满意。该代码在我的笔记本电脑上运行了 36 秒。从与其他 MOOC 同胞的论坛来看,C/C++/C# 通常在亚秒级到 5 秒内执行,Java 大约 10-15 秒,Python 大约 20-30 秒。 所以我的实现显然没有优化。我现在很高兴听到有关让它更快的技巧!!!谢谢!!!!