F# Array.Parallel.map 不提供并行处理
F# Array.Parallel.map does not provide parallel processing
我必须在 F# 中模拟一个离散环境,由 Python 调用,以解决强化学习问题。我有一个原始类型(主要是浮点数)的函数,可以使数据交换更顺畅。现在我可以用不同的数据多次 运行 这个函数,所以并行 运行 似乎是个好主意。
我有以下代码:
type AscentStrategy = |Strategy of seq<float>
let simulateAscent env ascentLimiter initState (sequenceOfDepths:seq<float>) =
//let infinitSeqOfConstantValues = (fun _ -> constantDepth) |> Seq.initInfinite
sequenceOfDepths
|> Seq.scan ( fun ( nextState, rew, isTerminal, _ ) depth -> getNextEnvResponseAndBoundForNextAction(env, nextState , depth , ascentLimiter) ) ( initState, 0.0 , false, 0.0)
|> SeqExtension.takeWhileWithLast (fun (_ , _, isTerminalState, _) -> not isTerminalState)
|> Seq.toArray
and then
let simulateStrategy ({MaxPDCS = maxPDCS ; MaxSimTime = maximumSimulationTime ; PenaltyForExceedingRisk = penaltyForExceedingRisk ;
RewardForDelivering = rewardForDelivering ; PenaltyForExceedingTime = penaltyForExceedingTime ; IntegrationTime = integrationTime
ControlToIntegrationTimeRatio = controlToIntegrationTimeRatio; DescentRate = descentRate; MaximumDepth = maximumDepth ;
BottomTime = bottomTime ; LegDiscreteTime = legDiscreteTime } : SimulationParameters) (Strategy ascentStrategy : AscentStrategy) =
let env, initState , ascentLimiter , _ = getEnvInitStateAndAscentLimiter ( maxPDCS , maximumSimulationTime ,
penaltyForExceedingRisk , rewardForDelivering , penaltyForExceedingTime ,
integrationTime ,
controlToIntegrationTimeRatio,
descentRate ,
maximumDepth ,
bottomTime ,
legDiscreteTime )
ascentStrategy
|> simulateAscent env ascentLimiter initState
最后我调用函数进行测试:
let commonSimulationParameters = {MaxPDCS = 0.32 ; MaxSimTime = 2000.0 ; PenaltyForExceedingRisk = 1.0 ; RewardForDelivering = 10.0; PenaltyForExceedingTime = 0.5 ;
IntegrationTime = 0.1; ControlToIntegrationTimeRatio = 10; DescentRate = 60.0; MaximumDepth = 20.0 ; BottomTime = 10.0; LegDiscreteTime = 0.1}
printfn"insert number of elements"
let maxInputsString = Console.ReadLine()
let maxInputs = maxInputsString |> Double.Parse
let inputsStrategies = [|0.0 .. maxInputs|] |> Array.map (fun x -> Seq.initInfinite (fun _ -> x ) )
let testParallel = inputsStrategies
|> Array.Parallel.map (fun x -> (simulateStrategy commonSimulationParameters ( Strategy x )) )
我将它与 Array.map 进行了比较,虽然它速度更快并且使用了笔记本电脑上 CPU 的 70%,但似乎仍然没有使用全部处理能力。我在一台有更多内核(~50)的机器上安装了 运行 它,它几乎没有增加 CPU 的使用(它占总使用量的 3/4%,有 50 个左右的独立输入)。我想一定是某处产生了死锁,但我该如何检测并摆脱它呢?
还有,为什么会这样?在我看来,函数式编程的优点之一也是能够轻松并行化。
PS:SeqExtension.takeWhileWithLast 是我在 SO 上找到的一个函数,由 Tomas Petricek 在他的精彩回答之一中友情提供,如果需要我可以 post 它。
PPS:env为环境,类型定义为:
type Environment<'S, 'A ,'I> = |Environment of (State<'S> -> Action<'A> -> EnvironmentOutput<'S ,'I>)
我用 Async.Parallel 和 ParallelSeq 也试过,报告同样的问题。
基于消息的解决方案能否解决问题>?我正在研究它,虽然我一点也不熟悉,但是使用 MailboxProcessor 是否是使代码并行的好方法?
根据我的问题,
我也尝试过这个基于数据流的并行代码库。 https://nessos.github.io/Streams/.
我添加了以下代码:
let nessosResult = inputsStrategies
|> ParStream.ofArray
|> ParStream.map simulateStrategy
|> ParStream.toArray
我已经为 inputStrategy 定义了一个特殊类型(基本是我拥有的旧元组),这样 simulateStrategy 只接受一个输入。不幸的是,这个问题似乎隐藏在某个地方。我附上一张使用 CPU 的图表。不同情况在我的机器上花费的时间是:~8.8 秒(连续); ~6.2 秒(Array.Parallel.map); ~ 6.1 秒 (Nessos.Streams)
我发现 server garbage collection 是在 .NET 上获得最佳并行性能所必需的。在你的 app.config:
中有这样的东西
<configuration>
<runtime>
<gcServer enabled="true" />
</runtime>
</configuration>
我必须在 F# 中模拟一个离散环境,由 Python 调用,以解决强化学习问题。我有一个原始类型(主要是浮点数)的函数,可以使数据交换更顺畅。现在我可以用不同的数据多次 运行 这个函数,所以并行 运行 似乎是个好主意。
我有以下代码:
type AscentStrategy = |Strategy of seq<float>
let simulateAscent env ascentLimiter initState (sequenceOfDepths:seq<float>) =
//let infinitSeqOfConstantValues = (fun _ -> constantDepth) |> Seq.initInfinite
sequenceOfDepths
|> Seq.scan ( fun ( nextState, rew, isTerminal, _ ) depth -> getNextEnvResponseAndBoundForNextAction(env, nextState , depth , ascentLimiter) ) ( initState, 0.0 , false, 0.0)
|> SeqExtension.takeWhileWithLast (fun (_ , _, isTerminalState, _) -> not isTerminalState)
|> Seq.toArray
and then
let simulateStrategy ({MaxPDCS = maxPDCS ; MaxSimTime = maximumSimulationTime ; PenaltyForExceedingRisk = penaltyForExceedingRisk ;
RewardForDelivering = rewardForDelivering ; PenaltyForExceedingTime = penaltyForExceedingTime ; IntegrationTime = integrationTime
ControlToIntegrationTimeRatio = controlToIntegrationTimeRatio; DescentRate = descentRate; MaximumDepth = maximumDepth ;
BottomTime = bottomTime ; LegDiscreteTime = legDiscreteTime } : SimulationParameters) (Strategy ascentStrategy : AscentStrategy) =
let env, initState , ascentLimiter , _ = getEnvInitStateAndAscentLimiter ( maxPDCS , maximumSimulationTime ,
penaltyForExceedingRisk , rewardForDelivering , penaltyForExceedingTime ,
integrationTime ,
controlToIntegrationTimeRatio,
descentRate ,
maximumDepth ,
bottomTime ,
legDiscreteTime )
ascentStrategy
|> simulateAscent env ascentLimiter initState
最后我调用函数进行测试:
let commonSimulationParameters = {MaxPDCS = 0.32 ; MaxSimTime = 2000.0 ; PenaltyForExceedingRisk = 1.0 ; RewardForDelivering = 10.0; PenaltyForExceedingTime = 0.5 ;
IntegrationTime = 0.1; ControlToIntegrationTimeRatio = 10; DescentRate = 60.0; MaximumDepth = 20.0 ; BottomTime = 10.0; LegDiscreteTime = 0.1}
printfn"insert number of elements"
let maxInputsString = Console.ReadLine()
let maxInputs = maxInputsString |> Double.Parse
let inputsStrategies = [|0.0 .. maxInputs|] |> Array.map (fun x -> Seq.initInfinite (fun _ -> x ) )
let testParallel = inputsStrategies
|> Array.Parallel.map (fun x -> (simulateStrategy commonSimulationParameters ( Strategy x )) )
我将它与 Array.map 进行了比较,虽然它速度更快并且使用了笔记本电脑上 CPU 的 70%,但似乎仍然没有使用全部处理能力。我在一台有更多内核(~50)的机器上安装了 运行 它,它几乎没有增加 CPU 的使用(它占总使用量的 3/4%,有 50 个左右的独立输入)。我想一定是某处产生了死锁,但我该如何检测并摆脱它呢?
还有,为什么会这样?在我看来,函数式编程的优点之一也是能够轻松并行化。
PS:SeqExtension.takeWhileWithLast 是我在 SO 上找到的一个函数,由 Tomas Petricek 在他的精彩回答之一中友情提供,如果需要我可以 post 它。
PPS:env为环境,类型定义为:
type Environment<'S, 'A ,'I> = |Environment of (State<'S> -> Action<'A> -> EnvironmentOutput<'S ,'I>)
我用 Async.Parallel 和 ParallelSeq 也试过,报告同样的问题。
基于消息的解决方案能否解决问题>?我正在研究它,虽然我一点也不熟悉,但是使用 MailboxProcessor 是否是使代码并行的好方法?
根据我的问题, 我也尝试过这个基于数据流的并行代码库。 https://nessos.github.io/Streams/.
我添加了以下代码:
let nessosResult = inputsStrategies
|> ParStream.ofArray
|> ParStream.map simulateStrategy
|> ParStream.toArray
我已经为 inputStrategy 定义了一个特殊类型(基本是我拥有的旧元组),这样 simulateStrategy 只接受一个输入。不幸的是,这个问题似乎隐藏在某个地方。我附上一张使用 CPU 的图表。不同情况在我的机器上花费的时间是:~8.8 秒(连续); ~6.2 秒(Array.Parallel.map); ~ 6.1 秒 (Nessos.Streams)
我发现 server garbage collection 是在 .NET 上获得最佳并行性能所必需的。在你的 app.config:
中有这样的东西<configuration>
<runtime>
<gcServer enabled="true" />
</runtime>
</configuration>