我怎样才能使这个带有多个 for 循环的 go 代码更快?
How can i make this go code with multiple for loops faster?
我最近开始从事这个项目。这是一个关于集体运动的模型。
在此代码中 8192 个粒子 移动 1000000 步 并且在每个步骤中每个粒子的位置和角度
更新。在每一步中,每个粒子的邻居都会将它们的角度与该特定粒子同步。
我在 python 中写了一个代码,但那太慢了。然后我在 go 中编写了这段代码,但它仍然不快。每一步平均需要 1.3 秒,这并不好。
你们有什么办法让它更快吗?
package main
import (
"bufio"
"flag"
"fmt"
"log"
"math"
"math/rand"
"os"
"runtime"
"time"
)
const (
n_particles int = 8192
n_steps int = 1000000
dt float64 = 1.0
v0 float64 = 0.5
radius float64 = 1.0
f_intensity float64 = 1.2
scale float64 = 128.0
alpha float64 = 1 / 36
)
var (
x [n_particles + 1]float64
y [n_particles + 1]float64
angles [n_particles + 1]float64
vx [n_particles + 1]float64
vy [n_particles + 1]float64
order [n_steps + 1]float64
fxk float64
fyk float64
fxi float64
fyi float64
)
func main() {
vstart := time.Now()
rsource := rand.NewSource(time.Now().UnixNano())
randomizer := rand.New(rsource)
//defining random data
for i := 0; i <= n_particles; {
x[i] = (randomizer.Float64()) * scale
y[i] = (randomizer.Float64()) * scale
angles[i] = (randomizer.Float64()) * math.Pi * 2
vx[i] = v0 * float64(math.Cos(angles[i]))
vy[i] = v0 * float64(math.Sin(angles[i]))
i = i + 1
}
for i := 0; i <= n_steps; {
start := time.Now()
cosangles := 0.0
sinangles := 0.0
for j := 0; j <= n_particles; {
x[j] = x[j] + (vx[j] * dt)
x[j] = math.Mod(x[j], scale)
y[j] = y[j] + (vy[j] * dt)
y[j] = math.Mod(x[j], scale)
j = j + 1
}
m_angles := angles
for j := 0; j <= n_particles; {
sumanglesx := 0.0
sumanglesy := 0.0
fxi = math.Floor(x[j])
fyi = math.Floor(y[j])
for k := 0; k <= n_particles; {
if k != j {
fxk = math.Floor(x[k])
fyk = math.Floor(y[k])
if (fxi == fxk && fyi == fyk) || (fxi+1 == fxk && fyi == fyk) || (fxi-1 == fxk && fyi == fyk) || (fxi == fxk && fyi+1 == fyk) || (fxi == fxk && fyi-1 == fyk) || (fxi+1 == fxk && fyi+1 == fyk) || (fxi-1 == fxk && fyi-1 == fyk) || (fxi-1 == fxk && fyi+1 == fyk) || (fxi+1 == fxk && fyi-1 == fyk) {
dist := math.Pow((x[k]-x[j]), 2) + math.Pow((y[k]-y[j]), 2)
if dist < radius {
if k > j {
sx := math.Cos(angles[k])
sy := math.Sin(angles[k])
sumanglesx = sumanglesx + sx
sumanglesy = sumanglesy + sy
} else if k < j {
sx := alpha * math.Cos(angles[k])
sy := alpha * math.Sin(angles[k])
sumanglesx = sumanglesx + sx
sumanglesy = sumanglesy + sy
}
}
}
}
k = k + 1
}
m_angles[j] = math.Atan2(sumanglesx, sumanglesy) + (f_intensity*randomizer.Float64() - f_intensity)
j = j + 1
}
angles = m_angles
for f := 0; f <= n_particles; {
vx[f] = v0 * float64(math.Cos(angles[f]))
vy[f] = v0 * float64(math.Sin(angles[f]))
f = f + 1
}
for h := 0; h <= n_particles; {
cosangles = cosangles + (math.Cos(angles[h]))
sinangles = sinangles + (math.Sin(angles[h]))
h = h + 1
}
core := (math.Sqrt(((math.Pow(cosangles, 2)) + (math.Pow(cosangles, 2))))) / float64(n_particles)
order[i] = core
duration := time.Since(start)
fmt.Println(i)
fmt.Println(duration)
i = i + 1
}
vduration := time.Since(vstart)
log.Printf("It took : %s", vduration)
f, _ := os.Create("order.txt")
for i := 0; i <= n_steps; {
s := fmt.Sprintf("%f", order[i])
w := bufio.NewWriter(f)
w.WriteString(s + "\n")
w.Flush()
i = i + 1
}
}
首先是一些小的快速胜利:
math.Pow(x, 2)
=> x * x
math.Sin
, math.Cos
=> math.Sincos
但一个更大的问题是循环遍历 n_particles2,考虑到显然只有相邻的粒子相互作用,这是一种浪费:
for j := 0; j <= n_particles; j++ {
fxi = math.Floor(x[j])
fyi = math.Floor(y[j])
for k := 0; k <= n_particles; k++ {
fxk = math.Floor(x[k])
fyk = math.Floor(y[k])
if (fxi == fxk && fyi == fyk) || (fxi+1 == fxk && fyi == fyk)
|| (fxi-1 == fxk && fyi == fyk) || (fxi == fxk && fyi+1 == fyk)
|| (fxi == fxk && fyi-1 == fyk) || (fxi+1 == fxk && fyi+1 == fyk)
|| (fxi-1 == fxk && fyi-1 == fyk) || (fxi-1 == fxk && fyi+1 == fyk)
|| (fxi+1 == fxk && fyi-1 == fyk) {
// ...
如何在没有 N2 嵌套循环的情况下遍历相邻粒子?也许如果你维护一个辅助邻接索引映射积分 x/y 坐标到粒子,你可以一次遍历它?
for i := 0; i <= n_steps; i++ {
// . . .
type intpos struct {
x, y int64
}
adjacencyIndex := make(map[intpos][]int)
for j := 0; j <= n_particles; j++ {
// . . .
ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
adjacencyIndex[intpos{ix, iy}] = append(adjacencyIndex[intpos{ix, iy}], j)
}
// . . .
for j := 0; j <= n_particles; j++ {
// . . .
ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
for dx := -1; dx <= 1; dx++ {
for dy := -1; dy <= 1; dy++ {
adjacentParticles := adjacencyIndex[intpos{ix + int64(dx), iy + int64(dy)}]
for _, k := range adjacentParticles {
if j < k {
// process interaction between particles j and k
}
}
}
}
// . . .
请注意,以上将导致每对相邻粒子之间发生单一相互作用。如果您需要双向交互(即 j <=> k
和 k <=> j
),请将 j < k
更改为 j != k
.
作为进一步的改进,您可以在粒子四处移动时更新邻接索引,而不是在每次迭代中重建它。
我最近开始从事这个项目。这是一个关于集体运动的模型。 在此代码中 8192 个粒子 移动 1000000 步 并且在每个步骤中每个粒子的位置和角度 更新。在每一步中,每个粒子的邻居都会将它们的角度与该特定粒子同步。 我在 python 中写了一个代码,但那太慢了。然后我在 go 中编写了这段代码,但它仍然不快。每一步平均需要 1.3 秒,这并不好。 你们有什么办法让它更快吗?
package main
import (
"bufio"
"flag"
"fmt"
"log"
"math"
"math/rand"
"os"
"runtime"
"time"
)
const (
n_particles int = 8192
n_steps int = 1000000
dt float64 = 1.0
v0 float64 = 0.5
radius float64 = 1.0
f_intensity float64 = 1.2
scale float64 = 128.0
alpha float64 = 1 / 36
)
var (
x [n_particles + 1]float64
y [n_particles + 1]float64
angles [n_particles + 1]float64
vx [n_particles + 1]float64
vy [n_particles + 1]float64
order [n_steps + 1]float64
fxk float64
fyk float64
fxi float64
fyi float64
)
func main() {
vstart := time.Now()
rsource := rand.NewSource(time.Now().UnixNano())
randomizer := rand.New(rsource)
//defining random data
for i := 0; i <= n_particles; {
x[i] = (randomizer.Float64()) * scale
y[i] = (randomizer.Float64()) * scale
angles[i] = (randomizer.Float64()) * math.Pi * 2
vx[i] = v0 * float64(math.Cos(angles[i]))
vy[i] = v0 * float64(math.Sin(angles[i]))
i = i + 1
}
for i := 0; i <= n_steps; {
start := time.Now()
cosangles := 0.0
sinangles := 0.0
for j := 0; j <= n_particles; {
x[j] = x[j] + (vx[j] * dt)
x[j] = math.Mod(x[j], scale)
y[j] = y[j] + (vy[j] * dt)
y[j] = math.Mod(x[j], scale)
j = j + 1
}
m_angles := angles
for j := 0; j <= n_particles; {
sumanglesx := 0.0
sumanglesy := 0.0
fxi = math.Floor(x[j])
fyi = math.Floor(y[j])
for k := 0; k <= n_particles; {
if k != j {
fxk = math.Floor(x[k])
fyk = math.Floor(y[k])
if (fxi == fxk && fyi == fyk) || (fxi+1 == fxk && fyi == fyk) || (fxi-1 == fxk && fyi == fyk) || (fxi == fxk && fyi+1 == fyk) || (fxi == fxk && fyi-1 == fyk) || (fxi+1 == fxk && fyi+1 == fyk) || (fxi-1 == fxk && fyi-1 == fyk) || (fxi-1 == fxk && fyi+1 == fyk) || (fxi+1 == fxk && fyi-1 == fyk) {
dist := math.Pow((x[k]-x[j]), 2) + math.Pow((y[k]-y[j]), 2)
if dist < radius {
if k > j {
sx := math.Cos(angles[k])
sy := math.Sin(angles[k])
sumanglesx = sumanglesx + sx
sumanglesy = sumanglesy + sy
} else if k < j {
sx := alpha * math.Cos(angles[k])
sy := alpha * math.Sin(angles[k])
sumanglesx = sumanglesx + sx
sumanglesy = sumanglesy + sy
}
}
}
}
k = k + 1
}
m_angles[j] = math.Atan2(sumanglesx, sumanglesy) + (f_intensity*randomizer.Float64() - f_intensity)
j = j + 1
}
angles = m_angles
for f := 0; f <= n_particles; {
vx[f] = v0 * float64(math.Cos(angles[f]))
vy[f] = v0 * float64(math.Sin(angles[f]))
f = f + 1
}
for h := 0; h <= n_particles; {
cosangles = cosangles + (math.Cos(angles[h]))
sinangles = sinangles + (math.Sin(angles[h]))
h = h + 1
}
core := (math.Sqrt(((math.Pow(cosangles, 2)) + (math.Pow(cosangles, 2))))) / float64(n_particles)
order[i] = core
duration := time.Since(start)
fmt.Println(i)
fmt.Println(duration)
i = i + 1
}
vduration := time.Since(vstart)
log.Printf("It took : %s", vduration)
f, _ := os.Create("order.txt")
for i := 0; i <= n_steps; {
s := fmt.Sprintf("%f", order[i])
w := bufio.NewWriter(f)
w.WriteString(s + "\n")
w.Flush()
i = i + 1
}
}
首先是一些小的快速胜利:
math.Pow(x, 2)
=>x * x
math.Sin
,math.Cos
=>math.Sincos
但一个更大的问题是循环遍历 n_particles2,考虑到显然只有相邻的粒子相互作用,这是一种浪费:
for j := 0; j <= n_particles; j++ {
fxi = math.Floor(x[j])
fyi = math.Floor(y[j])
for k := 0; k <= n_particles; k++ {
fxk = math.Floor(x[k])
fyk = math.Floor(y[k])
if (fxi == fxk && fyi == fyk) || (fxi+1 == fxk && fyi == fyk)
|| (fxi-1 == fxk && fyi == fyk) || (fxi == fxk && fyi+1 == fyk)
|| (fxi == fxk && fyi-1 == fyk) || (fxi+1 == fxk && fyi+1 == fyk)
|| (fxi-1 == fxk && fyi-1 == fyk) || (fxi-1 == fxk && fyi+1 == fyk)
|| (fxi+1 == fxk && fyi-1 == fyk) {
// ...
如何在没有 N2 嵌套循环的情况下遍历相邻粒子?也许如果你维护一个辅助邻接索引映射积分 x/y 坐标到粒子,你可以一次遍历它?
for i := 0; i <= n_steps; i++ {
// . . .
type intpos struct {
x, y int64
}
adjacencyIndex := make(map[intpos][]int)
for j := 0; j <= n_particles; j++ {
// . . .
ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
adjacencyIndex[intpos{ix, iy}] = append(adjacencyIndex[intpos{ix, iy}], j)
}
// . . .
for j := 0; j <= n_particles; j++ {
// . . .
ix, iy := int64(math.Floor(x[j])), int64(math.Floor(y[j]))
for dx := -1; dx <= 1; dx++ {
for dy := -1; dy <= 1; dy++ {
adjacentParticles := adjacencyIndex[intpos{ix + int64(dx), iy + int64(dy)}]
for _, k := range adjacentParticles {
if j < k {
// process interaction between particles j and k
}
}
}
}
// . . .
请注意,以上将导致每对相邻粒子之间发生单一相互作用。如果您需要双向交互(即 j <=> k
和 k <=> j
),请将 j < k
更改为 j != k
.
作为进一步的改进,您可以在粒子四处移动时更新邻接索引,而不是在每次迭代中重建它。