使用 python 的多处理星图的线程安全计数器
Thread safe counter using python's multiprocessing starmap
目前我正在尝试处理计算速度非常快的结果。起初我将每个模拟结果都插入到一个sqlite数据库中,但结果却是整个计算的瓶颈。所以我最终使用 cursor.executemany 而不是 cursor.execute,后者更快。
我现在的问题是我无法实现线程安全计数器。
executemany 任务应该是每 1000 次计算 运行。因此,我用 multiprocessing.Value 实现了一个初始化程序我也尝试了这个解决方案 (http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing) 但不知何故,计数器的某些值是重复的,最终导致 运行 经常或不经常执行 executemany 任务完全没有。
如果有人知道如何解决这个问题,我将不胜感激。
这是一个最小样本:
import multiprocessing, sqlite3
from multiprocessing import Value, Lock
from itertools import repeat
def worker(Testvalues, TotalValues):
MP_counter.value += 1
counter.increment()
con = sqlite3.connect("Test.db", timeout=30.0)
cur = con.cursor()
# Minimum sample:
Helper = list(range(5))
Helper = [x * Testvalues for x in Helper]
GList.append(Helper)
Execute_Every = 10
print("Counter class: %d" % (counter.value()))
print("MP_counter: %d" % (MP_counter.value))
if counter.value() % Execute_Every == 0 or counter.value() == TotalValues - 1:
print("Execute query")
print("Counter class: %d" % (counter.value()))
print("MP_counter: %d" % (MP_counter.value))
Helper = [tuple(row) for row in GList[:Execute_Every]]
del GList[:Execute_Every]
cur.executemany(
"INSERT INTO Test (One, Two, Three, Four, Five) VALUES (?, ?, ?, ?, ?);", Helper)
con.commit()
con.close()
def setup(t, g, c):
global MP_counter
global GList
global counter
MP_counter = t
GList = g
counter = c
class Counter(object):
def __init__(self, initval=0):
self.val = Value('i', initval)
self.lock = Lock()
def increment(self):
with self.lock:
self.val.value += 1
def value(self):
with self.lock:
return self.val.value
if __name__ == '__main__':
m = multiprocessing.Manager()
CPUS = multiprocessing.cpu_count()
MP_counter = multiprocessing.Value('i', 0)
GList = m.list([])
thread_safe_counter = Counter(0)
l = multiprocessing.Lock()
WORKERS = multiprocessing.Pool(initializer=setup, initargs=[MP_counter, GList, thread_safe_counter],processes=CPUS)
con = sqlite3.connect("Test.db", timeout=30.0)
cur = con.cursor()
cur.execute('PRAGMA journal_mode=wal')
SQLCommand = "CREATE TABLE IF NOT EXISTS Test (One INT, Two INT, Three INT, Four INT, Five INT);"
cur.execute(SQLCommand)
con.close()
TotalValues = 100
Testvalues = list(range(TotalValues))
WORKERS.starmap(worker, zip(Testvalues, repeat(TotalValues)))
WORKERS.close()
WORKERS.join()
#Check if list is empty
print(GList)
谢谢你们:)
您的计数器有一个 increment()
和一个 value()
方法,它们需要单独调用,因此为了确保安全,您必须在持有锁的同时调用这两个操作。您的 increment()
方法应该 return 递增后的新值,并且您应该使用它而无需进一步调用 value()
,例如:
class Counter(object):
def __init__(self, initval=0):
self.val = Value('i', initval)
self.lock = Lock()
def increment(self):
with self.lock:
self.val.value += 1
return self.val.value
...
def worker(Testvalues, TotalValues):
counter_value = counter.increment()
# use only counter_value from here on
...
此外,已经创建了一个 Value
,默认值为 RLock
,如果需要,可以在构造函数调用中使用不同的锁类型覆盖它。所以你真的不需要分配你自己的锁,你可以使用:
class Counter(object):
def __init__(self, initval=0):
self.val = Value('i', initval)
# or Value('i', initval, lock=Lock())
def increment(self):
with self.val.get_lock():
self.val.value += 1
return self.val.value
目前我正在尝试处理计算速度非常快的结果。起初我将每个模拟结果都插入到一个sqlite数据库中,但结果却是整个计算的瓶颈。所以我最终使用 cursor.executemany 而不是 cursor.execute,后者更快。
我现在的问题是我无法实现线程安全计数器。
executemany 任务应该是每 1000 次计算 运行。因此,我用 multiprocessing.Value 实现了一个初始化程序我也尝试了这个解决方案 (http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing) 但不知何故,计数器的某些值是重复的,最终导致 运行 经常或不经常执行 executemany 任务完全没有。
如果有人知道如何解决这个问题,我将不胜感激。
这是一个最小样本:
import multiprocessing, sqlite3
from multiprocessing import Value, Lock
from itertools import repeat
def worker(Testvalues, TotalValues):
MP_counter.value += 1
counter.increment()
con = sqlite3.connect("Test.db", timeout=30.0)
cur = con.cursor()
# Minimum sample:
Helper = list(range(5))
Helper = [x * Testvalues for x in Helper]
GList.append(Helper)
Execute_Every = 10
print("Counter class: %d" % (counter.value()))
print("MP_counter: %d" % (MP_counter.value))
if counter.value() % Execute_Every == 0 or counter.value() == TotalValues - 1:
print("Execute query")
print("Counter class: %d" % (counter.value()))
print("MP_counter: %d" % (MP_counter.value))
Helper = [tuple(row) for row in GList[:Execute_Every]]
del GList[:Execute_Every]
cur.executemany(
"INSERT INTO Test (One, Two, Three, Four, Five) VALUES (?, ?, ?, ?, ?);", Helper)
con.commit()
con.close()
def setup(t, g, c):
global MP_counter
global GList
global counter
MP_counter = t
GList = g
counter = c
class Counter(object):
def __init__(self, initval=0):
self.val = Value('i', initval)
self.lock = Lock()
def increment(self):
with self.lock:
self.val.value += 1
def value(self):
with self.lock:
return self.val.value
if __name__ == '__main__':
m = multiprocessing.Manager()
CPUS = multiprocessing.cpu_count()
MP_counter = multiprocessing.Value('i', 0)
GList = m.list([])
thread_safe_counter = Counter(0)
l = multiprocessing.Lock()
WORKERS = multiprocessing.Pool(initializer=setup, initargs=[MP_counter, GList, thread_safe_counter],processes=CPUS)
con = sqlite3.connect("Test.db", timeout=30.0)
cur = con.cursor()
cur.execute('PRAGMA journal_mode=wal')
SQLCommand = "CREATE TABLE IF NOT EXISTS Test (One INT, Two INT, Three INT, Four INT, Five INT);"
cur.execute(SQLCommand)
con.close()
TotalValues = 100
Testvalues = list(range(TotalValues))
WORKERS.starmap(worker, zip(Testvalues, repeat(TotalValues)))
WORKERS.close()
WORKERS.join()
#Check if list is empty
print(GList)
谢谢你们:)
您的计数器有一个 increment()
和一个 value()
方法,它们需要单独调用,因此为了确保安全,您必须在持有锁的同时调用这两个操作。您的 increment()
方法应该 return 递增后的新值,并且您应该使用它而无需进一步调用 value()
,例如:
class Counter(object):
def __init__(self, initval=0):
self.val = Value('i', initval)
self.lock = Lock()
def increment(self):
with self.lock:
self.val.value += 1
return self.val.value
...
def worker(Testvalues, TotalValues):
counter_value = counter.increment()
# use only counter_value from here on
...
此外,已经创建了一个 Value
,默认值为 RLock
,如果需要,可以在构造函数调用中使用不同的锁类型覆盖它。所以你真的不需要分配你自己的锁,你可以使用:
class Counter(object):
def __init__(self, initval=0):
self.val = Value('i', initval)
# or Value('i', initval, lock=Lock())
def increment(self):
with self.val.get_lock():
self.val.value += 1
return self.val.value