以小组形式执行 Python 个线程
Execute Python threads in small groups
我正在尝试使用 python 将一些(100)个数据集插入到 SQL 服务器中。我正在使用多线程在一个循环中创建 100 个线程。所有这些都同时启动,这使数据库陷入困境。我想将我的线程分为 5 组,一旦该组完成,我想启动下一组线程,依此类推。由于我是 python 和多线程的新手,任何帮助都非常重要 appreciated.Please 在下面找到我的代码。
for row in datasets:
argument1=row[0]
argument2=row[1]
jobs=[]
t = Thread(target=insertDataIntoSQLServer, args=(argument1,argument2,))
jobs.append(t)
t.start()
for t in jobs:
t.join()
您可以创建一个 ThreadPoolExecutor
并指定 max_workers=5
。
参见 here。
并且您可以使用 functools.partial
将您的函数转换为所需的 0 参数函数。
编辑: 当您 submit
给执行程序时,您可以将参数与函数名称一起传递。谢谢 Roland Smith,提醒我 partial
是个坏主意。有更好的方法。
在 Python 2 和 3 上,您可以使用 multiprocessing.ThreadPool
。这类似于 multiprocessing.Pool
,但使用线程而不是进程。
import multiprocessing
datasets = [(1,2,3), (4,5,6)] # Iterable of datasets.
def insertfn(data):
pass # shove data to SQL server
pool = multiprocessing.ThreadPool()
p.map(insertfn, datasets)
默认情况下,Pool
将创建与您的 CPU 拥有的内核一样多的工作线程。使用更多线程可能无济于事,因为它们将争夺 CPU 时间。
请注意,我已将数据分组到元组中。这是绕过池工作人员的一个参数限制的一种方法。
在 Python 3 你也可以使用 ThreadPoolExecutor
.
但是请注意,在具有全局解释器锁的 Python 实现(如 "standard" CPython)上,只有 一个 线程一次可以执行 Python 字节码。所以使用大量线程不会自动提高性能。线程可能有助于 I/O 绑定的操作。如果一个线程正在等待 I/O,另一个线程可以 运行.
首先请注意,您的代码没有按预期工作:它将 jobs
设置为一个空列表 每 循环一次,因此在循环之后只有你 join()
最后创建的线程。
因此,通过将 jobs=[]
移出循环来修复它。在那之后,你可以通过在 t.start()
:
之后添加这个来得到你所要求的
if len(jobs) == 5:
for t in jobs:
t.join()
jobs = []
我个人会使用某种池(如其他答案所建议的那样),但很容易直接得到您的想法。
我正在尝试使用 python 将一些(100)个数据集插入到 SQL 服务器中。我正在使用多线程在一个循环中创建 100 个线程。所有这些都同时启动,这使数据库陷入困境。我想将我的线程分为 5 组,一旦该组完成,我想启动下一组线程,依此类推。由于我是 python 和多线程的新手,任何帮助都非常重要 appreciated.Please 在下面找到我的代码。
for row in datasets:
argument1=row[0]
argument2=row[1]
jobs=[]
t = Thread(target=insertDataIntoSQLServer, args=(argument1,argument2,))
jobs.append(t)
t.start()
for t in jobs:
t.join()
您可以创建一个 ThreadPoolExecutor
并指定 max_workers=5
。
参见 here。
并且您可以使用 functools.partial
将您的函数转换为所需的 0 参数函数。
编辑: 当您 submit
给执行程序时,您可以将参数与函数名称一起传递。谢谢 Roland Smith,提醒我 partial
是个坏主意。有更好的方法。
在 Python 2 和 3 上,您可以使用 multiprocessing.ThreadPool
。这类似于 multiprocessing.Pool
,但使用线程而不是进程。
import multiprocessing
datasets = [(1,2,3), (4,5,6)] # Iterable of datasets.
def insertfn(data):
pass # shove data to SQL server
pool = multiprocessing.ThreadPool()
p.map(insertfn, datasets)
默认情况下,Pool
将创建与您的 CPU 拥有的内核一样多的工作线程。使用更多线程可能无济于事,因为它们将争夺 CPU 时间。
请注意,我已将数据分组到元组中。这是绕过池工作人员的一个参数限制的一种方法。
在 Python 3 你也可以使用 ThreadPoolExecutor
.
但是请注意,在具有全局解释器锁的 Python 实现(如 "standard" CPython)上,只有 一个 线程一次可以执行 Python 字节码。所以使用大量线程不会自动提高性能。线程可能有助于 I/O 绑定的操作。如果一个线程正在等待 I/O,另一个线程可以 运行.
首先请注意,您的代码没有按预期工作:它将 jobs
设置为一个空列表 每 循环一次,因此在循环之后只有你 join()
最后创建的线程。
因此,通过将 jobs=[]
移出循环来修复它。在那之后,你可以通过在 t.start()
:
if len(jobs) == 5:
for t in jobs:
t.join()
jobs = []
我个人会使用某种池(如其他答案所建议的那样),但很容易直接得到您的想法。