使用 python 的多处理星图的线程安全计数器

Thread safe counter using python's multiprocessing starmap

目前我正在尝试处理计算速度非常快的结果。起初我将每个模拟结果都插入到一个sqlite数据库中,但结果却是整个计算的瓶颈。所以我最终使用 cursor.executemany 而不是 cursor.execute,后者更快。

我现在的问题是我无法实现线程安全计数器。

executemany 任务应该是每 1000 次计算 运行。因此,我用 multiprocessing.Value 实现了一个初始化程序我也尝试了这个解决方案 (http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing) 但不知何故,计数器的某些值是重复的,最终导致 运行 经常或不经常执行 executemany 任务完全没有。

如果有人知道如何解决这个问题,我将不胜感激。

这是一个最小样本:

import multiprocessing, sqlite3
from multiprocessing import Value, Lock
from itertools import repeat

def worker(Testvalues, TotalValues):
    MP_counter.value += 1
    counter.increment()

    con = sqlite3.connect("Test.db", timeout=30.0)
    cur = con.cursor()
    # Minimum sample:
    Helper = list(range(5))
    Helper = [x * Testvalues for x in Helper]
    GList.append(Helper)

    Execute_Every = 10
    print("Counter class: %d" % (counter.value()))
    print("MP_counter: %d" % (MP_counter.value))

    if counter.value() % Execute_Every == 0 or counter.value() == TotalValues - 1:
        print("Execute query")
        print("Counter class: %d" % (counter.value()))
        print("MP_counter: %d" % (MP_counter.value))

        Helper = [tuple(row) for row in GList[:Execute_Every]]
        del GList[:Execute_Every]
        cur.executemany(
            "INSERT INTO Test (One, Two, Three, Four, Five) VALUES (?, ?, ?, ?, ?);", Helper)
        con.commit()

    con.close()

def setup(t, g, c):
    global MP_counter
    global GList
    global counter
    MP_counter = t
    GList = g
    counter = c

class Counter(object):
    def __init__(self, initval=0):
        self.val = Value('i', initval)
        self.lock = Lock()

    def increment(self):
        with self.lock:
            self.val.value += 1

    def value(self):
        with self.lock:
            return self.val.value

if __name__ == '__main__':
    m = multiprocessing.Manager()
    CPUS = multiprocessing.cpu_count()
    MP_counter = multiprocessing.Value('i', 0)
    GList = m.list([])
    thread_safe_counter = Counter(0)

    l = multiprocessing.Lock()
    WORKERS = multiprocessing.Pool(initializer=setup, initargs=[MP_counter, GList, thread_safe_counter],processes=CPUS)

    con = sqlite3.connect("Test.db", timeout=30.0)
    cur = con.cursor()
    cur.execute('PRAGMA journal_mode=wal')
    SQLCommand = "CREATE TABLE IF NOT EXISTS Test (One INT, Two INT, Three INT, Four INT, Five INT);"
    cur.execute(SQLCommand)
    con.close()

    TotalValues = 100
    Testvalues = list(range(TotalValues))

    WORKERS.starmap(worker, zip(Testvalues, repeat(TotalValues)))
    WORKERS.close()
    WORKERS.join()
    #Check if list is empty
    print(GList)

谢谢你们:)

您的计数器有一个 increment() 和一个 value() 方法,它们需要单独调用,因此为了确保安全,您必须在持有锁的同时调用这两个操作。您的 increment() 方法应该 return 递增后的新值,并且您应该使用它而无需进一步调用 value(),例如:

class Counter(object):
    def __init__(self, initval=0):
        self.val = Value('i', initval)
        self.lock = Lock()

    def increment(self):
        with self.lock:
            self.val.value += 1
            return self.val.value

...

def worker(Testvalues, TotalValues):
    counter_value = counter.increment()
    # use only counter_value from here on
    ...

此外,已经创建了一个 Value,默认值为 RLock,如果需要,可以在构造函数调用中使用不同的锁类型覆盖它。所以你真的不需要分配你自己的锁,你可以使用:

class Counter(object):
    def __init__(self, initval=0):
        self.val = Value('i', initval)
        # or Value('i', initval, lock=Lock())

    def increment(self):
        with self.val.get_lock():
            self.val.value += 1
            return self.val.value