OCaml 使用异步编写超时函数
OCaml writing a timeout function using Async
我正在尝试编写一个尝试计算函数的函数,但在特定超时后停止。
我尝试使用 Deferred.any
,它 return 是一个延迟,当其中一个底层延迟被满足时,它就会被满足。
type 'a output = OK of 'a | Exn of exn
let fun_test msg f eq (inp,ans) =
let outp = wait_for (Deferred.any
[ return (try OK (f inp) with e -> Exn e)
; (after (Core.Std.sec 0.0) >>| (fun () -> Exn TIMEOUT))])
in {msg = msg;inp = inp;outp = outp;ans = ans;pass = eq outp ans}
我不确定如何从延迟的 monad 中提取值,所以我写了一个函数 'wait_for',它一直旋转直到确定基础值。
let rec wait_for x =
match Deferred.peek x with
| None -> wait_for x
| Some done -> done;;
这没有用。阅读 Async chapter of Real World OCaml 后,我意识到我需要启动调度程序。但是我不确定在我的代码中在哪里调用 Schedule.go
。我没有看到类型 go : ?raise_unhandled_exn:bool -> unit -> Core.Std.never_returns
适合您实际希望异步代码 return 的代码。 go
的文档说 "Async programs do not exit until shutdown
is called."
我开始怀疑我是否采取了完全错误的方法来解决这个问题,直到我在 this Cornell website
上找到了对同一问题非常相似的解决方案
let timeout (thunk:unit -> 'a Deferred.t) (n:float) : ('a option) Deferred.t
= Deferred.any
[ after (sec n) >>| (fun () -> None) ;
thunk () >>= (fun x -> Some x) ]
无论如何,我不太确定我对 wait_for
的使用是否正确。有没有一种规范的方法可以从延迟的 monad 中提取值?另外,如何启动调度程序?
更新:
我尝试仅使用 Core.Std.Thread
和 Core.Std.Mutex
.
编写超时函数
let rec wait_for lck ptr =
Core.Std.Thread.delay 0.25;
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> Core.Std.Mutex.unlock lck; wait_for lck ptr
| Some x -> Core.Std.Mutex.unlock lck; x);;
let timeout t f =
let lck = Core.Std.Mutex.create () in
let ptr = ref None in
let _ = Core.Std.Thread.create
(fun () -> Core.Std.Thread.delay t;
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> ptr := Some (Exn TIMEOUT)
| Some _ -> ());
Core.Std.Mutex.unlock lck;) () in
let _ = Core.Std.Thread.create
(fun () -> let x = f () in
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> ptr := Some x
| Some _ -> ());
Core.Std.Mutex.unlock lck;) () in
wait_for lck ptr
我认为这非常接近工作。它适用于 let rec loop x = print_string ".\n"; loop x
这样的计算,但不适用于 let rec loop x = loop x
这样的计算。我认为现在的问题是,如果计算 f ()
无限循环,那么它的线程永远不会被抢占,所以其他线程的 none 可以注意到超时已经过期。如果线程执行像打印字符串这样的 IO,那么线程确实会被抢占。我也不知道如何杀死一个线程,我在 documentation for Core.Std.Thread
中找不到这样的函数
我想到的解决办法是
let kill pid sign =
try Unix.kill pid sign with
| Unix.Unix_error (e,f,p) -> debug_print ((Unix.error_message e)^"|"^f^"|"^p)
| e -> raise e;;
let timeout f arg time default =
let pipe_r,pipe_w = Unix.pipe () in
(match Unix.fork () with
| 0 -> let x = Some (f arg) in
let oc = Unix.out_channel_of_descr pipe_w in
Marshal.to_channel oc x [];
close_out oc;
exit 0
| pid0 ->
(match Unix.fork () with
| 0 -> Unix.sleep time;
kill pid0 Sys.sigkill;
let oc = Unix.out_channel_of_descr pipe_w in
Marshal.to_channel oc default [];
close_out oc;
exit 0
| pid1 -> let ic = Unix.in_channel_of_descr pipe_r in
let result = (Marshal.from_channel ic : 'b option) in
result ));;
我想我可能会用这个创建两个僵尸进程。但它是使用 ocamlopt
编译时唯一适用于 let rec loop x = loop x
的解决方案(使用 Unix.alarm
给定 here 的解决方案在使用 ocamlc
编译时有效,但在编译时无效ocamlopt
).
我正在尝试编写一个尝试计算函数的函数,但在特定超时后停止。
我尝试使用 Deferred.any
,它 return 是一个延迟,当其中一个底层延迟被满足时,它就会被满足。
type 'a output = OK of 'a | Exn of exn
let fun_test msg f eq (inp,ans) =
let outp = wait_for (Deferred.any
[ return (try OK (f inp) with e -> Exn e)
; (after (Core.Std.sec 0.0) >>| (fun () -> Exn TIMEOUT))])
in {msg = msg;inp = inp;outp = outp;ans = ans;pass = eq outp ans}
我不确定如何从延迟的 monad 中提取值,所以我写了一个函数 'wait_for',它一直旋转直到确定基础值。
let rec wait_for x =
match Deferred.peek x with
| None -> wait_for x
| Some done -> done;;
这没有用。阅读 Async chapter of Real World OCaml 后,我意识到我需要启动调度程序。但是我不确定在我的代码中在哪里调用 Schedule.go
。我没有看到类型 go : ?raise_unhandled_exn:bool -> unit -> Core.Std.never_returns
适合您实际希望异步代码 return 的代码。 go
的文档说 "Async programs do not exit until shutdown
is called."
我开始怀疑我是否采取了完全错误的方法来解决这个问题,直到我在 this Cornell website
上找到了对同一问题非常相似的解决方案let timeout (thunk:unit -> 'a Deferred.t) (n:float) : ('a option) Deferred.t
= Deferred.any
[ after (sec n) >>| (fun () -> None) ;
thunk () >>= (fun x -> Some x) ]
无论如何,我不太确定我对 wait_for
的使用是否正确。有没有一种规范的方法可以从延迟的 monad 中提取值?另外,如何启动调度程序?
更新:
我尝试仅使用 Core.Std.Thread
和 Core.Std.Mutex
.
let rec wait_for lck ptr =
Core.Std.Thread.delay 0.25;
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> Core.Std.Mutex.unlock lck; wait_for lck ptr
| Some x -> Core.Std.Mutex.unlock lck; x);;
let timeout t f =
let lck = Core.Std.Mutex.create () in
let ptr = ref None in
let _ = Core.Std.Thread.create
(fun () -> Core.Std.Thread.delay t;
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> ptr := Some (Exn TIMEOUT)
| Some _ -> ());
Core.Std.Mutex.unlock lck;) () in
let _ = Core.Std.Thread.create
(fun () -> let x = f () in
Core.Std.Mutex.lock lck;
(match !ptr with
| None -> ptr := Some x
| Some _ -> ());
Core.Std.Mutex.unlock lck;) () in
wait_for lck ptr
我认为这非常接近工作。它适用于 let rec loop x = print_string ".\n"; loop x
这样的计算,但不适用于 let rec loop x = loop x
这样的计算。我认为现在的问题是,如果计算 f ()
无限循环,那么它的线程永远不会被抢占,所以其他线程的 none 可以注意到超时已经过期。如果线程执行像打印字符串这样的 IO,那么线程确实会被抢占。我也不知道如何杀死一个线程,我在 documentation for Core.Std.Thread
我想到的解决办法是
let kill pid sign =
try Unix.kill pid sign with
| Unix.Unix_error (e,f,p) -> debug_print ((Unix.error_message e)^"|"^f^"|"^p)
| e -> raise e;;
let timeout f arg time default =
let pipe_r,pipe_w = Unix.pipe () in
(match Unix.fork () with
| 0 -> let x = Some (f arg) in
let oc = Unix.out_channel_of_descr pipe_w in
Marshal.to_channel oc x [];
close_out oc;
exit 0
| pid0 ->
(match Unix.fork () with
| 0 -> Unix.sleep time;
kill pid0 Sys.sigkill;
let oc = Unix.out_channel_of_descr pipe_w in
Marshal.to_channel oc default [];
close_out oc;
exit 0
| pid1 -> let ic = Unix.in_channel_of_descr pipe_r in
let result = (Marshal.from_channel ic : 'b option) in
result ));;
我想我可能会用这个创建两个僵尸进程。但它是使用 ocamlopt
编译时唯一适用于 let rec loop x = loop x
的解决方案(使用 Unix.alarm
给定 here 的解决方案在使用 ocamlc
编译时有效,但在编译时无效ocamlopt
).