ruby rails postgresql 活动记录并行更新
ruby on rails postgresql active record parallle update
我有一个名为 AisSignal 的模型,有大约 3000 条记录,我 运行 将每个模型与另一个名为 Footprint 的模型进行比较,有大约 10 条记录,所以我们有一个 3000 x 10 的循环。
我试过了:
Parallel.each(AisSignal.all, in_processes: 8) do |signal|
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
但它 运行10 秒后就像正常块一样。
我尝试像下面那样从进程更改为线程,但这会导致应用程序冻结。
Parallel.each(AisSignal.all, in_threads: 8) do |signal|
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
我在 database.yml
中有 50 个池大小
有多个线程 运行 并行更新记录的任何想法或方法。
我实际上需要更新更多记录,这可能需要几分钟。
线程和分支通常不能很好地处理数据库连接。如果处理不当,threads/processes 可能会同时尝试使用相同的连接。
Parallel mentions this in their documentation. You need to make use of connection pooling.
A connection pool synchronizes thread access to a limited number of database connections. The basic idea is that each thread checks out a database connection from the pool, uses that connection, and checks the connection back in. ConnectionPool is completely thread-safe, and will ensure that a connection cannot be used by two threads at the same time, as long as ConnectionPool's contract is correctly followed. It will also handle cases in which there are more threads than connections: if all connections have been checked out, and a thread tries to checkout a connection anyway, then ConnectionPool will wait until some other thread has checked in a connection.
Parallel.each(AisSignal.all, in_threads: 8) do |signal|
ActiveRecord::Base.connection_pool.with_connection do
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
end
请注意,此代码效率很低。
- 它加载整个
AisSignal
table。
- 对于每个信号,它加载并扫描整个
Footprint
table。
它将使用大量内存,并且会在 s*f 时间内 运行,其中 s
是信号数,f
是足迹数。
您可以通过将 Footprint.all.each
替换为 Footprint.find_each
来减少内存占用。这将批量加载行。
线程并不是让数据库查询更快的方法。根本问题是您在 Ruby 中多次扫描 Footprint,而不是让数据库执行此操作。 if footprint.cover([signal.lon, signal.lat])
应该改为 where 子句。
AisSignal.find_each do |signal|
# With ... being the equivalent of `cover([signal.lon, signal.lat])`
# as a where clause.
signal.update!(imo: 'in') if Footprint.exists?(...)
end
这可以作为连接更快地完成。
# ... is the equivalent of `cover([signal.lon, signal.lat])`
AisSignal.joins("inner join footprints on ...").update_all(imo: 'in')
我有一个名为 AisSignal 的模型,有大约 3000 条记录,我 运行 将每个模型与另一个名为 Footprint 的模型进行比较,有大约 10 条记录,所以我们有一个 3000 x 10 的循环。
我试过了:
Parallel.each(AisSignal.all, in_processes: 8) do |signal|
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
但它 运行10 秒后就像正常块一样。
我尝试像下面那样从进程更改为线程,但这会导致应用程序冻结。
Parallel.each(AisSignal.all, in_threads: 8) do |signal|
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
我在 database.yml
中有 50 个池大小有多个线程 运行 并行更新记录的任何想法或方法。 我实际上需要更新更多记录,这可能需要几分钟。
线程和分支通常不能很好地处理数据库连接。如果处理不当,threads/processes 可能会同时尝试使用相同的连接。
Parallel mentions this in their documentation. You need to make use of connection pooling.
A connection pool synchronizes thread access to a limited number of database connections. The basic idea is that each thread checks out a database connection from the pool, uses that connection, and checks the connection back in. ConnectionPool is completely thread-safe, and will ensure that a connection cannot be used by two threads at the same time, as long as ConnectionPool's contract is correctly followed. It will also handle cases in which there are more threads than connections: if all connections have been checked out, and a thread tries to checkout a connection anyway, then ConnectionPool will wait until some other thread has checked in a connection.
Parallel.each(AisSignal.all, in_threads: 8) do |signal|
ActiveRecord::Base.connection_pool.with_connection do
Footprint.all.each do |footprint|
if footprint.cover([signal.lon, signal.lat])
signal.update(imo: 'in')
break
end
end
end
end
请注意,此代码效率很低。
- 它加载整个
AisSignal
table。 - 对于每个信号,它加载并扫描整个
Footprint
table。
它将使用大量内存,并且会在 s*f 时间内 运行,其中 s
是信号数,f
是足迹数。
您可以通过将 Footprint.all.each
替换为 Footprint.find_each
来减少内存占用。这将批量加载行。
线程并不是让数据库查询更快的方法。根本问题是您在 Ruby 中多次扫描 Footprint,而不是让数据库执行此操作。 if footprint.cover([signal.lon, signal.lat])
应该改为 where 子句。
AisSignal.find_each do |signal|
# With ... being the equivalent of `cover([signal.lon, signal.lat])`
# as a where clause.
signal.update!(imo: 'in') if Footprint.exists?(...)
end
这可以作为连接更快地完成。
# ... is the equivalent of `cover([signal.lon, signal.lat])`
AisSignal.joins("inner join footprints on ...").update_all(imo: 'in')