如何在不对性能时间产生负面影响的情况下正确处理 python 线程中的竞争条件

How do I properly deal with race conditions in python threading while not negatively impacting performance times

我想知道如何正确地在我的应用程序中实现线程化并在不对脚本性能时间产生负面影响的情况下阻止任何竞争条件。

我省略并简化了代码以便于阅读,但在结构上与下面看到的类似

run.py

from lib.DropSender import DropSender

drop_sender = DropSender( options )
drop_sender.start()

DropSender.py

from lib.Connect import Connection
import threading
import json

class DropSender:

    def __init__( self , options = {} ):
        self.system_online = True

    # This is a Web Socket connection delivering messages
    def on_message(self, message):
        js = json.loads( message )
        symbol = js[6]
        connections = Connection( self, symbol )        
        connections.start()
        

Connect.py

import threading
import requests
import mysql.connector
from threading import Thread, Lock

class Connection( threading.Thread ):

    def __init__( self , drop_sender, symbol ):
        threading.Thread.__init__( self )
        self.symbol = symbol
        self.lock = Lock()

    def run( self ):
        self.users_list = self.getUsers( self.symbol )
        if self.users_list["count"] > 0:
            for u in self.users_list["data"]:
                self.user_id = u["user_id"] # Example 1122
                self.amount  = u["amount"] # 923.40
                t = Thread(target=self.makePurchase, args=(self.symbol, self.user_id, self.amount, ))
                t.start()               
                # t.join()

我知道 join() 消除了竞争条件,但这又降低了脚本的执行速度,线程相互等待

    def getUsers():
        # MYSQL Call to get list of users who like this 'symbol'
       my_users_arr = { "data" : data, "count" : count }
       return my_users_arr

    
    def makePurchase( self, symbol, user_id, amount ):
        # Lock it up
        self.lock.acquire()
         
        -All sorts of race conditions happening here, even with the locks acquired-
        # User ID = 1122
        # Amount 884.00 (1122's User ID mixing up with another users amount, race condition)
        # Release Lock
        self.lock.release()

您对 connection.user_idconnection.amount 的使用看起来很可疑。只有一个连接对象,使用这两个字段,然后通过设置下一个线程立即覆盖。

您没有显示 makePurchase() 的代码,但不应期望 self 的这两个字段是正确的。

顺便说一句,我强烈推荐使用线程池。

from multiprocessing.pool import ThreadPool

with ThreadPool() as pool:
    for ....:
        pool.apply_async(func, args)

这会将线程数限制为您计算机上的 CPU 数,并确保在完成后清理所有线程。