使用 TableDependency 和 F# 等待数据库行加载
Waiting for database rows to load using TableDependency and F#
我有一个 F# 项目,它将一些文件加载到外部子系统,然后使用 Table Dependency 等待一些行被添加到 table 作为副作用。
Table 下面的类型使用依赖关系来监视数据库的变化。当行是 added/changed/whatever 时,它会触发一个自定义事件:
// just using this type for the RecordChangedEvent to marshal the id we want into something
type AccountLoaded() =
let mutable someId = ""
// this property name matches the name of the table column (SomeId)
member this.SomeId
with get () = someId
and set (value) = someId <- value
// AccountLoadWatcher
type AccountLoadWatcher() =
let mutable _tableDependency = null
let event = new Event<_>()
interface IDisposable with
member this.Dispose() =
_tableDependency.Stop()
_tableDependency.Dispose()
// custom event we can send when an account is loaded
[<CLIEvent>]
member this.AccountLoaded = event.Publish
member private this.NotifyAccountLoaded(sender : RecordChangedEventArgs<AccountLoaded>) =
let accountLoaded = sender.Entity
event.Trigger(accountLoaded.SomeId)
member this.Watch() =
_tableDependency <- DbLib.getTableDependency "dbo" "AccountTable"
null
_tableDependency.OnChanged.Add(this.NotifyAccountLoaded)
_tableDependency.Start()
我想做的是获取上面的对象,然后等待加载我关心的所有具有 id 的行。我目前拥有的是:
let waitForRows(csvFileRows) =
let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
let mutable collected = Set.empty
let isInSet id = Set.contains id idsToWaitFor
let notDone = not <| (Set.difference idsToWaitFor collected = Set.empty)
let accountLoadedHandler id =
collected <- collected.Add id
printfn "Id loaded %s, waiting for %A\n" id (Set.difference idsToWaitFor collected)
loadToSubsystem csvFileRows |> ignore
// wait for all the watcher events; filtering each event object for ids we care about
watcher.AccountLoaded
|> Observable.takeWhile (fun _ -> notDone)
|> Observable.filter (fun e -> isInSet e)
|> Observable.subscribe accountLoadedHandler
|> ignore
doMoreWork()
但这只是继续做更多工作,而无需等待上面我需要的所有事件。
我需要使用任务还是异步? F# 代理?
鉴于您在示例中使用 Observable.takeWhile
,我假设您使用 FSharp.Control.Reactive 包装器来访问所有反应式组合器。
您的方法有一些好主意,例如使用 takeWhile
等到您收集所有 ID,但使用突变是非常不幸的 - 由于可能存在种族问题,这样做甚至可能不安全条件。
一个不错的替代方法是使用各种 scan
函数之一来收集事件发生时的状态。您可以使用 Observable.scanInit
从一个空集合开始并添加所有 ID;然后是 Observable.takeWhile
以继续接受事件,直到您拥有所有等待的 ID。要真正等待(并阻止),您可以使用 Observable.wait
。像这样:
let waitForRows(csvFileRows) =
let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
let finalCollectedIDs =
watcher.AccountLoaded
|> Observable.scanInit Set.empty (fun collected id -> Set.add id collected)
|> Observable.takeWhile (fun collected -> not (Set.isSubset idsToWaitFor co llected))
|> Observable.wait
printfn "Completed. Final collected IDs are: %A" finalCollectedIDs
我有一个 F# 项目,它将一些文件加载到外部子系统,然后使用 Table Dependency 等待一些行被添加到 table 作为副作用。
Table 下面的类型使用依赖关系来监视数据库的变化。当行是 added/changed/whatever 时,它会触发一个自定义事件:
// just using this type for the RecordChangedEvent to marshal the id we want into something
type AccountLoaded() =
let mutable someId = ""
// this property name matches the name of the table column (SomeId)
member this.SomeId
with get () = someId
and set (value) = someId <- value
// AccountLoadWatcher
type AccountLoadWatcher() =
let mutable _tableDependency = null
let event = new Event<_>()
interface IDisposable with
member this.Dispose() =
_tableDependency.Stop()
_tableDependency.Dispose()
// custom event we can send when an account is loaded
[<CLIEvent>]
member this.AccountLoaded = event.Publish
member private this.NotifyAccountLoaded(sender : RecordChangedEventArgs<AccountLoaded>) =
let accountLoaded = sender.Entity
event.Trigger(accountLoaded.SomeId)
member this.Watch() =
_tableDependency <- DbLib.getTableDependency "dbo" "AccountTable"
null
_tableDependency.OnChanged.Add(this.NotifyAccountLoaded)
_tableDependency.Start()
我想做的是获取上面的对象,然后等待加载我关心的所有具有 id 的行。我目前拥有的是:
let waitForRows(csvFileRows) =
let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
let mutable collected = Set.empty
let isInSet id = Set.contains id idsToWaitFor
let notDone = not <| (Set.difference idsToWaitFor collected = Set.empty)
let accountLoadedHandler id =
collected <- collected.Add id
printfn "Id loaded %s, waiting for %A\n" id (Set.difference idsToWaitFor collected)
loadToSubsystem csvFileRows |> ignore
// wait for all the watcher events; filtering each event object for ids we care about
watcher.AccountLoaded
|> Observable.takeWhile (fun _ -> notDone)
|> Observable.filter (fun e -> isInSet e)
|> Observable.subscribe accountLoadedHandler
|> ignore
doMoreWork()
但这只是继续做更多工作,而无需等待上面我需要的所有事件。
我需要使用任务还是异步? F# 代理?
鉴于您在示例中使用 Observable.takeWhile
,我假设您使用 FSharp.Control.Reactive 包装器来访问所有反应式组合器。
您的方法有一些好主意,例如使用 takeWhile
等到您收集所有 ID,但使用突变是非常不幸的 - 由于可能存在种族问题,这样做甚至可能不安全条件。
一个不错的替代方法是使用各种 scan
函数之一来收集事件发生时的状态。您可以使用 Observable.scanInit
从一个空集合开始并添加所有 ID;然后是 Observable.takeWhile
以继续接受事件,直到您拥有所有等待的 ID。要真正等待(并阻止),您可以使用 Observable.wait
。像这样:
let waitForRows(csvFileRows) =
let idsToWaitFor = parseUniqueIdsFromAllRows csvFileRows
let finalCollectedIDs =
watcher.AccountLoaded
|> Observable.scanInit Set.empty (fun collected id -> Set.add id collected)
|> Observable.takeWhile (fun collected -> not (Set.isSubset idsToWaitFor co llected))
|> Observable.wait
printfn "Completed. Final collected IDs are: %A" finalCollectedIDs