如何在不对性能时间产生负面影响的情况下正确处理 python 线程中的竞争条件
How do I properly deal with race conditions in python threading while not negatively impacting performance times
我想知道如何正确地在我的应用程序中实现线程化并在不对脚本性能时间产生负面影响的情况下阻止任何竞争条件。
我省略并简化了代码以便于阅读,但在结构上与下面看到的类似
run.py
from lib.DropSender import DropSender
drop_sender = DropSender( options )
drop_sender.start()
DropSender.py
from lib.Connect import Connection
import threading
import json
class DropSender:
def __init__( self , options = {} ):
self.system_online = True
# This is a Web Socket connection delivering messages
def on_message(self, message):
js = json.loads( message )
symbol = js[6]
connections = Connection( self, symbol )
connections.start()
Connect.py
import threading
import requests
import mysql.connector
from threading import Thread, Lock
class Connection( threading.Thread ):
def __init__( self , drop_sender, symbol ):
threading.Thread.__init__( self )
self.symbol = symbol
self.lock = Lock()
def run( self ):
self.users_list = self.getUsers( self.symbol )
if self.users_list["count"] > 0:
for u in self.users_list["data"]:
self.user_id = u["user_id"] # Example 1122
self.amount = u["amount"] # 923.40
t = Thread(target=self.makePurchase, args=(self.symbol, self.user_id, self.amount, ))
t.start()
# t.join()
我知道 join() 消除了竞争条件,但这又降低了脚本的执行速度,线程相互等待
def getUsers():
# MYSQL Call to get list of users who like this 'symbol'
my_users_arr = { "data" : data, "count" : count }
return my_users_arr
def makePurchase( self, symbol, user_id, amount ):
# Lock it up
self.lock.acquire()
-All sorts of race conditions happening here, even with the locks acquired-
# User ID = 1122
# Amount 884.00 (1122's User ID mixing up with another users amount, race condition)
# Release Lock
self.lock.release()
您对 connection.user_id
和 connection.amount
的使用看起来很可疑。只有一个连接对象,使用这两个字段,然后通过设置下一个线程立即覆盖。
您没有显示 makePurchase()
的代码,但不应期望 self
的这两个字段是正确的。
顺便说一句,我强烈推荐使用线程池。
from multiprocessing.pool import ThreadPool
with ThreadPool() as pool:
for ....:
pool.apply_async(func, args)
这会将线程数限制为您计算机上的 CPU 数,并确保在完成后清理所有线程。
我想知道如何正确地在我的应用程序中实现线程化并在不对脚本性能时间产生负面影响的情况下阻止任何竞争条件。
我省略并简化了代码以便于阅读,但在结构上与下面看到的类似
run.py
from lib.DropSender import DropSender
drop_sender = DropSender( options )
drop_sender.start()
DropSender.py
from lib.Connect import Connection
import threading
import json
class DropSender:
def __init__( self , options = {} ):
self.system_online = True
# This is a Web Socket connection delivering messages
def on_message(self, message):
js = json.loads( message )
symbol = js[6]
connections = Connection( self, symbol )
connections.start()
Connect.py
import threading
import requests
import mysql.connector
from threading import Thread, Lock
class Connection( threading.Thread ):
def __init__( self , drop_sender, symbol ):
threading.Thread.__init__( self )
self.symbol = symbol
self.lock = Lock()
def run( self ):
self.users_list = self.getUsers( self.symbol )
if self.users_list["count"] > 0:
for u in self.users_list["data"]:
self.user_id = u["user_id"] # Example 1122
self.amount = u["amount"] # 923.40
t = Thread(target=self.makePurchase, args=(self.symbol, self.user_id, self.amount, ))
t.start()
# t.join()
我知道 join() 消除了竞争条件,但这又降低了脚本的执行速度,线程相互等待
def getUsers():
# MYSQL Call to get list of users who like this 'symbol'
my_users_arr = { "data" : data, "count" : count }
return my_users_arr
def makePurchase( self, symbol, user_id, amount ):
# Lock it up
self.lock.acquire()
-All sorts of race conditions happening here, even with the locks acquired-
# User ID = 1122
# Amount 884.00 (1122's User ID mixing up with another users amount, race condition)
# Release Lock
self.lock.release()
您对 connection.user_id
和 connection.amount
的使用看起来很可疑。只有一个连接对象,使用这两个字段,然后通过设置下一个线程立即覆盖。
您没有显示 makePurchase()
的代码,但不应期望 self
的这两个字段是正确的。
顺便说一句,我强烈推荐使用线程池。
from multiprocessing.pool import ThreadPool
with ThreadPool() as pool:
for ....:
pool.apply_async(func, args)
这会将线程数限制为您计算机上的 CPU 数,并确保在完成后清理所有线程。