python 多线程数据争用

python multithreading data race

我正在 python 2.7 上制作多线程系统。 基本上,它有 3 个线程和一个 singleton-class 共享数据。

红色箭头 - 调用;蓝色箭头 - access

每个线程在一个文件中都是独立的 class。文件 main.py 导入工作和通讯文件,以及共享数据。然后主线程在一个线程中调用 working class 并在另一个线程中调用 communication 。共享数据作为单例的唯一实例,在工作class和通信class.

的构造函数中传递

文件main.py

import communication
import Worker
import Data

app_data = Data.Instance()
#...........

SRV = communication.Server(app_data)
SRV.setDaemon(True)
SRV.start()

#...........

while True
    #...........
    # MUST BE locker.acquire()
    if condition1:
        if condition2:
            job = Worker(app_data, SRV.taskResultSendToSlaves, app_data.ip_table[app_data.cfg.MY_IP]['tasks'].pop())
            job.setDaemon(True)
            job.start()
    # MUST BE locker.release()

文件communication.py

class Server(threading.Thread):

    # .................

    def __init__(self, data):
        self.data = data
        # .................
        threading.Thread.__init__(self)

    def run(self):
        srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        srv.settimeout(self.data.cfg.TIMEOUT)
        srv.bind((self.my_addr, self.my_port))
        srv.listen(self.data.cfg.NUMBER_OF_CLIENTS)
        print "Start server"
        while True:
            # HANDLING MESSAGES FROM OTHER PC

    # .................

文件Worker.py

class Worker(threading.Thread):    
    def __init__(self, data, sender, taskname):
        self.data = data
        self.sender = sender
        self.taskname = taskname
        threading.Thread.__init__(self)

    def run(self):
        import thread
        self.data.complete_task.clear()
        tick_before = time.time()
        startupinfo = subprocess.STARTUPINFO()
        startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
        startupinfo.wShowWindow = subprocess.SW_HIDE
        p = subprocess.Popen(self.data.cfg.PATH_INTERPRETER + " " + self.data.cfg.PATH_TASKS + self.taskname, startupinfo=startupinfo, shell=False, stdout=subprocess.PIPE)
        job_result, err = p.communicate()
        tick_after = time.time()
        work_time = tick_after - tick_before  
        # MUST BE locker.acquire()      
        self.data.task_table[self.taskname]['status'] = 'complete'
        self.data.task_table[self.taskname]['result'] = job_result
        self.data.task_table[self.taskname]['time'] = work_time
        # MUST BE locker.release()
        logging.debug("%s task is done" % self.taskname)
        tr = threading.Thread(target=self.sender, name="SENDER", args=(self.taskname, ))
        tr.setDaemon(True)
        tr.start()
        tr.join()
        logging.debug("%s task is sent" % self.taskname)
        self.data.complete_task.set()
        thread.exit()

Singletone.py

class Singleton:

    def __init__(self, decorated):
        self._decorated = decorated

    def Instance(self):
        try:
            return self._instance
        except AttributeError:
            self._instance = self._decorated()
            return self._instance

def __call__(self):
    raise TypeError('Singletons must be accessed through `Instance()`.')

def __instancecheck__(self, inst):
    return isinstance(inst, self._decorated)

data.py

#-*- coding: utf-8 -*-
from singletone import Singleton
from configs import Configurations
import threading
import logging


@Singleton
class Data:

    def __init__(self):
        logging.basicConfig(format=u'%(filename)-10s[LINE:%(lineno)d] <%(funcName)-15s> # %(levelname)-8s [%(asctime)s]  %(message)s'.encode('cp1251', 'ignore'), level=logging.DEBUG, filename='mylog.log')
        logging.log(100, '='*120)
        self.cfg = Configurations()
        self.ip_table = self.getIPTable()
        self.task_table = self.getTaskTable()
        self.locker = threading.Lock()
        self.initialization = threading.Event()
        self.initialization.clear()
        self.identification = threading.Event()
        self.identification.clear()
        self.complete_task = threading.Event()
        self.complete_task.set()
        self.flag_of_close = False

    def __str__(self):
        return "\
        {0}\n\
        \n\
        {1}\n\
        \n\
        {2}\n\
        ".format(str(self.cfg), self.strIPTable(), self.strTaskTable())

    def strIPTable(self):
        #return str(self.ip_table)
        result = ["%s = %s" % (key, str(value)) for key, value in self.ip_table.items()]
        result.sort()
        return "\n\t\t".join(result)

    def strTaskTable(self):
        #return str(self.task_table)
        result = ["%s = %s" % (key, str(value)) for key, value in self.task_table.items()]
        result.sort()
        return "\n\t\t".join(result)

    def getIPTable(self):
        result = {}
        if self.cfg.IPS:
            result = dict((item.strip(), {'status': True, 'port': 8000, 'tasks': []}) for item in self.cfg.IPS.split(','))
            # result = dict((item.strip(), {'status': False, 'port': 8000, 'tasks': []}) for item in self.cfg.IPS.split(','))
        result[self.cfg.MY_IP] = {'status': True, 'port': 8000, 'tasks': []}
        return result

    def getTaskTable(self):
        result = {}
        if self.cfg.TASKS:
            result = dict((item.strip(), {'status': 'uncomplete', 'result': '', 'time': 0}) for item in self.cfg.TASKS.split(','))
        return result

    def getTotalCompleteTasks(self):
        result = 0
        for taskname in self.task_table.keys():
            if self.task_table[taskname]['status'] == 'complete':
                result += 1
        return result


if __name__ == '__main__':
    data = Data.Instance()
    print data

我从 Whosebug 偷来的单例

启动此系统后,有时会发生数据竞争。工作时和主线程同时读取共享数据。我想我们需要一个 threading.Lock 在这里。然后我犯了一个错误,我把 Lock 对象放在共享数据中并用它来分离访问。很快就明白了我的错误。

Filenames has changed, some pieces of code has removed.

但是现在我不知道我必须把 Lock 对象放在哪里,这样每个线程都可以轻松地以正确的方式访问和使用它。你能给我建议吗?

我的英语不是很好,请多多包涵。我希望你能理解我的问题...

P.S.

除此之外,我尝试在 classes 的构造函数中传递 Lock() 对象。我有同样的麻烦。应用程序掉落在访问数据的某个地方。而且我找不到它的确切位置。每次启动都会以 50% 的概率掉落应用程序。

我发现了这个错误。 是 Singleton class,但我不知道如何修复它。