使用两个线程时来自 python 个队列的错误值
wrong values from python queues while using two threads
这里我使用了两个线程。一个线程创建一个数据帧并通过队列将其传递给另一个线程。另一个线程从队列中收集数据帧并将其附加到 csv 文件。在这里,当我 运行 代码时,有时我在整个数据框中得到了错误的值。我应该在代码中做任何更正吗?
这是我的代码:
import threading
from queue import Queue
import pandas as pd
import numpy as np
import time
q=Queue()
df = pd.DataFrame(columns =['1Hz','2Hz', '3Hz', '4Hz', '5Hz', '6Hz', '7Hz'])
def getpoints(q):
print (threading.current_thread().getName() + " starts id - {} ".format( threading.get_ident()))
global df
origin=0
end=1
while Active:
def get_values_for_frequency(freq):
omega = 2*np.pi*freq
t_vec = np.linspace(origin,end,num=1000)
y = np.sin(omega*t_vec)
return y
df['1Hz']=pd.Series(get_values_for_frequency(1))
df['2Hz']=pd.Series(get_values_for_frequency(2))
df['3Hz']=pd.Series(get_values_for_frequency(3))
df['4Hz']=pd.Series(get_values_for_frequency(4))
df['5Hz']=pd.Series(get_values_for_frequency(5))
df['6Hz']=pd.Series(get_values_for_frequency(6))
df['7Hz']=pd.Series(get_values_for_frequency(7))
origin=end
end=end+1
df = df.round(decimals = 7)
df=(2**15)*df
q.put(df) // queue sends the dataframe
print('yes') // print yes to acknowledge the no of times of dataframe sent
print(df)
time.sleep(1)
print("thread1 ended")
def ot(q):
global Active
print (threading.current_thread().getName()+ " starts id - {}".format(threading.get_ident()))
while (Active or not q.empty()):
t2receive=q.get() // queue receives values from thread 1 queue
print(t2receive)
if Active==False and q.empty():
break
time.sleep(2)
print("Thread2 ended")
if "__main__"==__name__:
print("MAIN THREAD id -", threading.get_ident())
global Active
Active=True
t1=threading.Thread(target=getpoints,args=(q,))
t2=threading.Thread(target=ot,args=(q,))
t1.start()
t2.start()
print("\ncurrently active threads -", threading.enumerate(),'\n')
input("press Enter to stop\n\n")
Active=False
print("Entered!\n")
t1.join()
t2.join()
print("\ncurrently active threads -", threading.enumerate(),'\n')
print("AGAIN MAIN THREAD id - ",threading.get_ident())
print("T1 Alive : ",t1.is_alive())
print("T2 Alive : ",t2.is_alive())
yes
1Hz 2Hz 3Hz 4Hz 5Hz 6Hz 7Hz
0 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 0.000000
1 206.091059 412.175565 618.243686 824.285594 1030.298010 1236.267827 1442.188493
2 412.175565 824.285594 1236.267827 1648.053453 2059.576934 2470.776013 2881.581875
3 618.243686 1236.267827 1853.849600 2470.776013 3086.817690 3701.764915 4315.391590
4 824.285594 1648.053453 2470.776013 3291.932262 4111.007744 4927.481446 5740.835635
.. ... ... ... ... ... ... ...
995 -824.285594 -1648.053453 -2470.776013 -3291.932262 -4111.007744 -4927.481446 -5740.835635
996 -618.243686 -1236.267827 -1853.849600 -2470.776013 -3086.817690 -3701.764915 -4315.391590
997 -412.175565 -824.285594 -1236.267827 -1648.053453 -2059.576934 -2470.776013 -2881.581875
998 -206.091059 -412.175565 -618.243686 -824.285594 -1030.298010 -1236.267827 -1442.188493
999 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000
[1000 rows x 7 columns]
Entered!
1Hz 2Hz 3Hz 4Hz 5Hz 6Hz 7Hz
0 -7.347881e-16 -1.469576e-15 -2.204364e-15 -2.939152e-15 -1.077937e-14 -4.408728e-15 1.961911e-15
1 6.289433e-03 1.257862e-02 1.886730e-02 2.515525e-02 3.144219e-02 3.772789e-02 4.401210e-02
2 1.257862e-02 2.515525e-02 3.772789e-02 5.029457e-02 6.285329e-02 7.540206e-02 8.793891e-02
3 1.886730e-02 3.772789e-02 5.657505e-02 7.540206e-02 9.420224e-02 1.129689e-01 1.316953e-01
4 2.515525e-02 5.029457e-02 7.540206e-02 1.004618e-01 1.254580e-01 1.503748e-01 1.751964e-01
.. ... ... ... ... ... ... ...
995 -2.515525e-02 -5.029457e-02 -7.540206e-02 -1.004618e-01 -1.254580e-01 -1.503748e-01 -1.751964e-01
996 -1.886730e-02 -3.772789e-02 -5.657505e-02 -7.540206e-02 -9.420224e-02 -1.129689e-01 -1.316953e-01
997 -1.257862e-02 -2.515525e-02 -3.772789e-02 -5.029457e-02 -6.285329e-02 -7.540206e-02 -8.793891e-02
998 -6.289433e-03 -1.257862e-02 -1.886730e-02 -2.515525e-02 -3.144219e-02 -3.772789e-02 -4.401210e-02
999 -9.797174e-16 -1.959435e-15 -2.939152e-15 -3.918870e-15 -4.898587e-15 -5.878305e-15 -6.858022e-15
[1000 rows x 7 columns]
thread1 ended
1Hz 2Hz 3Hz 4Hz 5Hz 6Hz 7Hz
0 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 0.000000
1 206.091059 412.175565 618.243686 824.285594 1030.298010 1236.267827 1442.188493
2 412.175565 824.285594 1236.267827 1648.053453 2059.576934 2470.776013 2881.581875
3 618.243686 1236.267827 1853.849600 2470.776013 3086.817690 3701.764915 4315.391590
4 824.285594 1648.053453 2470.776013 3291.932262 4111.007744 4927.481446 5740.835635
以上是我在中间数据帧中得到错误值的示例输出。顶部和底部数据框是正确的。中间数据帧也应该像第一个数据帧(所有数据帧中的数据都相同)。
谁能解释一下我必须更改代码的内容?提前致谢
输出:
def getpoints(q,l):
print (threading.current_thread().getName() + " starts id - {} ".format( threading.get_ident()))
global df
origin=0
end=1
while Active:
def get_values_for_frequency(freq):
omega = 2*np.pi*freq
t_vec = np.linspace(origin,end,num=1000)
y = np.sin(omega*t_vec)
return y
df['1Hz']=pd.Series(get_values_for_frequency(1))
df['2Hz']=pd.Series(get_values_for_frequency(2))
df['3Hz']=pd.Series(get_values_for_frequency(3))
df['4Hz']=pd.Series(get_values_for_frequency(4))
df['5Hz']=pd.Series(get_values_for_frequency(5))
df['6Hz']=pd.Series(get_values_for_frequency(6))
df['7Hz']=pd.Series(get_values_for_frequency(7))
origin=end
end=end+1
df = df.round(decimals = 7)
df=(2**15)*df
q.put(df)
print('yes')
time.sleep(1)
print("thread1 ended")
def ot(q,l):
global Active
print (threading.current_thread().getName()+ " starts id - {}".format(threading.get_ident()))
while (Active or not q.empty()):
try:
t2receive=q.get()
t2receive.to_csv("mycsv.csv",mode='a',index=False,header=False)
except Queue.empty():
break
except Exception as err:
pass
time.sleep(1)
print("Thread2 ended")
在进一步研究代码之后,我认为问题不在于队列大小的增加,而在于全局 df,它一直被 getpoints 重用。
尝试将 q.put(df)
替换为 q.put(df.copy())
,它在队列中存储数据帧的副本,而不是“全局”副本,后者在收到时可能会被修改。
这里我使用了两个线程。一个线程创建一个数据帧并通过队列将其传递给另一个线程。另一个线程从队列中收集数据帧并将其附加到 csv 文件。在这里,当我 运行 代码时,有时我在整个数据框中得到了错误的值。我应该在代码中做任何更正吗?
这是我的代码:
import threading
from queue import Queue
import pandas as pd
import numpy as np
import time
q=Queue()
df = pd.DataFrame(columns =['1Hz','2Hz', '3Hz', '4Hz', '5Hz', '6Hz', '7Hz'])
def getpoints(q):
print (threading.current_thread().getName() + " starts id - {} ".format( threading.get_ident()))
global df
origin=0
end=1
while Active:
def get_values_for_frequency(freq):
omega = 2*np.pi*freq
t_vec = np.linspace(origin,end,num=1000)
y = np.sin(omega*t_vec)
return y
df['1Hz']=pd.Series(get_values_for_frequency(1))
df['2Hz']=pd.Series(get_values_for_frequency(2))
df['3Hz']=pd.Series(get_values_for_frequency(3))
df['4Hz']=pd.Series(get_values_for_frequency(4))
df['5Hz']=pd.Series(get_values_for_frequency(5))
df['6Hz']=pd.Series(get_values_for_frequency(6))
df['7Hz']=pd.Series(get_values_for_frequency(7))
origin=end
end=end+1
df = df.round(decimals = 7)
df=(2**15)*df
q.put(df) // queue sends the dataframe
print('yes') // print yes to acknowledge the no of times of dataframe sent
print(df)
time.sleep(1)
print("thread1 ended")
def ot(q):
global Active
print (threading.current_thread().getName()+ " starts id - {}".format(threading.get_ident()))
while (Active or not q.empty()):
t2receive=q.get() // queue receives values from thread 1 queue
print(t2receive)
if Active==False and q.empty():
break
time.sleep(2)
print("Thread2 ended")
if "__main__"==__name__:
print("MAIN THREAD id -", threading.get_ident())
global Active
Active=True
t1=threading.Thread(target=getpoints,args=(q,))
t2=threading.Thread(target=ot,args=(q,))
t1.start()
t2.start()
print("\ncurrently active threads -", threading.enumerate(),'\n')
input("press Enter to stop\n\n")
Active=False
print("Entered!\n")
t1.join()
t2.join()
print("\ncurrently active threads -", threading.enumerate(),'\n')
print("AGAIN MAIN THREAD id - ",threading.get_ident())
print("T1 Alive : ",t1.is_alive())
print("T2 Alive : ",t2.is_alive())
yes
1Hz 2Hz 3Hz 4Hz 5Hz 6Hz 7Hz
0 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 0.000000
1 206.091059 412.175565 618.243686 824.285594 1030.298010 1236.267827 1442.188493
2 412.175565 824.285594 1236.267827 1648.053453 2059.576934 2470.776013 2881.581875
3 618.243686 1236.267827 1853.849600 2470.776013 3086.817690 3701.764915 4315.391590
4 824.285594 1648.053453 2470.776013 3291.932262 4111.007744 4927.481446 5740.835635
.. ... ... ... ... ... ... ...
995 -824.285594 -1648.053453 -2470.776013 -3291.932262 -4111.007744 -4927.481446 -5740.835635
996 -618.243686 -1236.267827 -1853.849600 -2470.776013 -3086.817690 -3701.764915 -4315.391590
997 -412.175565 -824.285594 -1236.267827 -1648.053453 -2059.576934 -2470.776013 -2881.581875
998 -206.091059 -412.175565 -618.243686 -824.285594 -1030.298010 -1236.267827 -1442.188493
999 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000
[1000 rows x 7 columns]
Entered!
1Hz 2Hz 3Hz 4Hz 5Hz 6Hz 7Hz
0 -7.347881e-16 -1.469576e-15 -2.204364e-15 -2.939152e-15 -1.077937e-14 -4.408728e-15 1.961911e-15
1 6.289433e-03 1.257862e-02 1.886730e-02 2.515525e-02 3.144219e-02 3.772789e-02 4.401210e-02
2 1.257862e-02 2.515525e-02 3.772789e-02 5.029457e-02 6.285329e-02 7.540206e-02 8.793891e-02
3 1.886730e-02 3.772789e-02 5.657505e-02 7.540206e-02 9.420224e-02 1.129689e-01 1.316953e-01
4 2.515525e-02 5.029457e-02 7.540206e-02 1.004618e-01 1.254580e-01 1.503748e-01 1.751964e-01
.. ... ... ... ... ... ... ...
995 -2.515525e-02 -5.029457e-02 -7.540206e-02 -1.004618e-01 -1.254580e-01 -1.503748e-01 -1.751964e-01
996 -1.886730e-02 -3.772789e-02 -5.657505e-02 -7.540206e-02 -9.420224e-02 -1.129689e-01 -1.316953e-01
997 -1.257862e-02 -2.515525e-02 -3.772789e-02 -5.029457e-02 -6.285329e-02 -7.540206e-02 -8.793891e-02
998 -6.289433e-03 -1.257862e-02 -1.886730e-02 -2.515525e-02 -3.144219e-02 -3.772789e-02 -4.401210e-02
999 -9.797174e-16 -1.959435e-15 -2.939152e-15 -3.918870e-15 -4.898587e-15 -5.878305e-15 -6.858022e-15
[1000 rows x 7 columns]
thread1 ended
1Hz 2Hz 3Hz 4Hz 5Hz 6Hz 7Hz
0 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 -0.000000 0.000000
1 206.091059 412.175565 618.243686 824.285594 1030.298010 1236.267827 1442.188493
2 412.175565 824.285594 1236.267827 1648.053453 2059.576934 2470.776013 2881.581875
3 618.243686 1236.267827 1853.849600 2470.776013 3086.817690 3701.764915 4315.391590
4 824.285594 1648.053453 2470.776013 3291.932262 4111.007744 4927.481446 5740.835635
以上是我在中间数据帧中得到错误值的示例输出。顶部和底部数据框是正确的。中间数据帧也应该像第一个数据帧(所有数据帧中的数据都相同)。
谁能解释一下我必须更改代码的内容?提前致谢
输出:
def getpoints(q,l):
print (threading.current_thread().getName() + " starts id - {} ".format( threading.get_ident()))
global df
origin=0
end=1
while Active:
def get_values_for_frequency(freq):
omega = 2*np.pi*freq
t_vec = np.linspace(origin,end,num=1000)
y = np.sin(omega*t_vec)
return y
df['1Hz']=pd.Series(get_values_for_frequency(1))
df['2Hz']=pd.Series(get_values_for_frequency(2))
df['3Hz']=pd.Series(get_values_for_frequency(3))
df['4Hz']=pd.Series(get_values_for_frequency(4))
df['5Hz']=pd.Series(get_values_for_frequency(5))
df['6Hz']=pd.Series(get_values_for_frequency(6))
df['7Hz']=pd.Series(get_values_for_frequency(7))
origin=end
end=end+1
df = df.round(decimals = 7)
df=(2**15)*df
q.put(df)
print('yes')
time.sleep(1)
print("thread1 ended")
def ot(q,l):
global Active
print (threading.current_thread().getName()+ " starts id - {}".format(threading.get_ident()))
while (Active or not q.empty()):
try:
t2receive=q.get()
t2receive.to_csv("mycsv.csv",mode='a',index=False,header=False)
except Queue.empty():
break
except Exception as err:
pass
time.sleep(1)
print("Thread2 ended")
在进一步研究代码之后,我认为问题不在于队列大小的增加,而在于全局 df,它一直被 getpoints 重用。
尝试将 q.put(df)
替换为 q.put(df.copy())
,它在队列中存储数据帧的副本,而不是“全局”副本,后者在收到时可能会被修改。