使用多处理时未更新实例变量 Python
Instance variables not being updated Python when using Multiprocessing
我在更新变量方面遇到了一个不寻常的问题。我构建了一个简单的 class 对象来帮助我进行一些网络嗅探。我想做一个并行过程,它允许我 运行 一些网络测试并捕获使用 python 生成的流量,这样我就可以扩展程序来做一些令人惊奇的事情。我正在使用 scapy 的嗅探功能来帮助进行界面嗅探。
Scapy 的嗅探器允许您将一个函数传递给它自己的函数,该函数允许您创建一个 'stop sniffing' 条件。在我的例子中,我创建了函数 stop_filter
并且我希望通过简单地更新 self.stop_sniffing
实例变量来停止 Scapy 嗅探函数。我在下面展示了程序输出,它显示 self.stop_sniffing
在函数 stop
中设置为 True,但在 stop_filter
中打印时又设置回 False(或根本不更新) ].我不知道为什么会这样,也没有想到解决方案,因为这是一个很奇怪的问题。
如果有新眼光的人能看到我在这里做了什么疯狂的事情,将不胜感激!
from scapy.all import *
from multiprocessing import Process
class DatasetSniffer:
def __init__(self, iface, local_dir='.'):
self.iface = iface
self.master = None
self.local_dir = local_dir
self.stop_sniffing = False # Never updates! why!?
self.writer = PcapWriter(local_dir+"/master.pcap", append=True, sync=True)
def stop_filter(self, p):
# Note: 'p' gets passed in by Scapy function 'sniff'
print self.stop_sniffing
# Return 'True' to stop sniffer
return self.stop_sniffing
def sniff(self):
sniff(store=0, prn=self.writer.write, iface=self.iface, stop_filter=self.stop_filter)
def start(self):
self.master = Process(target=self.sniff)
self.master.start()
def stop(self):
self.stop_sniffing = True
# Shows that self.stop_sniffing is 'True'
print self.stop_sniffing
self.master.join()
if __name__ == "__main__":
interface = 'en3'
sniffer = DatasetSniffer(interface)
sniffer.start()
# some process
time.sleep(5)
sniffer.stop()
Shell 输出:
sudo python sniffing.py
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
True
False
False
False
False
问题
您在此示例代码中没有使用多个线程,您使用的是多个 进程 。
这里有两个不共享内存的独立进程:
- 原流程
一个新进程,由 multiprocessing.Process.start
启动
- 此进程将通过分叉原始进程开始,在分叉时创建其内存的副本。他们没有 "share" 记忆。
现在,当您在原始进程中调用 DatasetSniffer.stop
时,这不会改变新 ("master") 进程中 stop_sniffing
的值。
那怎么沟通呢?
使用multiprocessing
时,您可以使用Pipe
进行交流。像这样:
readable_pipe, writable_pipe = multiprocessing.Pipe(duplex=False)
process = Process(target=do_something)
现在,我们的原始进程可以通过写入管道来发送消息:
writable_pipe.send("stop")
而新进程可以使用以下方式检查消息:
if readable_pipe.poll():
msg = readable_pipe.recv()
尝试将其应用到您的代码中。
感谢您的所有建议。经过一杯灵感,我设法敲定了这个剧本。可能是一种更好的方法来解决我的问题而无需进行太多更改。所以这段代码允许线程在 class 之外使用停止函数,从而允许所有异步任务使用 stop_filter
.
在下面的 link 中找到此信息。希望这个 post 对其他人有用!
http://www.tutorialspoint.com/python/python_multithreading.htm
干杯!
import threading
from scapy.all import *
from datetime import datetime
directory = str(datetime.now().strftime("%Y%m%d%H%M%S"))
os.makedirs(directory)
DatasetSnifferExit = 0
class DatasetSniffer(threading.Thread):
def __init__(self, iface, local_dir='.', filename=str(datetime.now())):
self.iface = iface
self.filename = filename
self.local_dir = local_dir
self.stop_sniffing = False
self.writer = PcapWriter(local_dir+"/"+filename+".pcap", append=True, sync=True)
threading.Thread.__init__(self)
def run(self):
sniff_interface(self.writer.write, self.iface)
def stop_filter(p):
if DatasetSnifferExit:
return True
else:
return False
def sniff_interface(write, iface):
sniff(store=0, prn=write, iface=iface, stop_filter=stop_filter)
if __name__ == "__main__":
DatasetSnifferExit = False
# Create new threads
pcap1 = DatasetSniffer('en3', directory, "master")
pcap2 = DatasetSniffer('en0', directory, "slave")
# Start new Threads
pcap1.start()
pcap2.start()
# Do stuff
time.sleep(10)
# Finished doing stuff
DatasetSnifferExit = True
我在更新变量方面遇到了一个不寻常的问题。我构建了一个简单的 class 对象来帮助我进行一些网络嗅探。我想做一个并行过程,它允许我 运行 一些网络测试并捕获使用 python 生成的流量,这样我就可以扩展程序来做一些令人惊奇的事情。我正在使用 scapy 的嗅探功能来帮助进行界面嗅探。
Scapy 的嗅探器允许您将一个函数传递给它自己的函数,该函数允许您创建一个 'stop sniffing' 条件。在我的例子中,我创建了函数 stop_filter
并且我希望通过简单地更新 self.stop_sniffing
实例变量来停止 Scapy 嗅探函数。我在下面展示了程序输出,它显示 self.stop_sniffing
在函数 stop
中设置为 True,但在 stop_filter
中打印时又设置回 False(或根本不更新) ].我不知道为什么会这样,也没有想到解决方案,因为这是一个很奇怪的问题。
如果有新眼光的人能看到我在这里做了什么疯狂的事情,将不胜感激!
from scapy.all import *
from multiprocessing import Process
class DatasetSniffer:
def __init__(self, iface, local_dir='.'):
self.iface = iface
self.master = None
self.local_dir = local_dir
self.stop_sniffing = False # Never updates! why!?
self.writer = PcapWriter(local_dir+"/master.pcap", append=True, sync=True)
def stop_filter(self, p):
# Note: 'p' gets passed in by Scapy function 'sniff'
print self.stop_sniffing
# Return 'True' to stop sniffer
return self.stop_sniffing
def sniff(self):
sniff(store=0, prn=self.writer.write, iface=self.iface, stop_filter=self.stop_filter)
def start(self):
self.master = Process(target=self.sniff)
self.master.start()
def stop(self):
self.stop_sniffing = True
# Shows that self.stop_sniffing is 'True'
print self.stop_sniffing
self.master.join()
if __name__ == "__main__":
interface = 'en3'
sniffer = DatasetSniffer(interface)
sniffer.start()
# some process
time.sleep(5)
sniffer.stop()
Shell 输出:
sudo python sniffing.py
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
True
False
False
False
False
问题
您在此示例代码中没有使用多个线程,您使用的是多个 进程 。
这里有两个不共享内存的独立进程:
- 原流程
一个新进程,由
启动multiprocessing.Process.start
- 此进程将通过分叉原始进程开始,在分叉时创建其内存的副本。他们没有 "share" 记忆。
现在,当您在原始进程中调用 DatasetSniffer.stop
时,这不会改变新 ("master") 进程中 stop_sniffing
的值。
那怎么沟通呢?
使用multiprocessing
时,您可以使用Pipe
进行交流。像这样:
readable_pipe, writable_pipe = multiprocessing.Pipe(duplex=False)
process = Process(target=do_something)
现在,我们的原始进程可以通过写入管道来发送消息:
writable_pipe.send("stop")
而新进程可以使用以下方式检查消息:
if readable_pipe.poll():
msg = readable_pipe.recv()
尝试将其应用到您的代码中。
感谢您的所有建议。经过一杯灵感,我设法敲定了这个剧本。可能是一种更好的方法来解决我的问题而无需进行太多更改。所以这段代码允许线程在 class 之外使用停止函数,从而允许所有异步任务使用 stop_filter
.
在下面的 link 中找到此信息。希望这个 post 对其他人有用! http://www.tutorialspoint.com/python/python_multithreading.htm
干杯!
import threading
from scapy.all import *
from datetime import datetime
directory = str(datetime.now().strftime("%Y%m%d%H%M%S"))
os.makedirs(directory)
DatasetSnifferExit = 0
class DatasetSniffer(threading.Thread):
def __init__(self, iface, local_dir='.', filename=str(datetime.now())):
self.iface = iface
self.filename = filename
self.local_dir = local_dir
self.stop_sniffing = False
self.writer = PcapWriter(local_dir+"/"+filename+".pcap", append=True, sync=True)
threading.Thread.__init__(self)
def run(self):
sniff_interface(self.writer.write, self.iface)
def stop_filter(p):
if DatasetSnifferExit:
return True
else:
return False
def sniff_interface(write, iface):
sniff(store=0, prn=write, iface=iface, stop_filter=stop_filter)
if __name__ == "__main__":
DatasetSnifferExit = False
# Create new threads
pcap1 = DatasetSniffer('en3', directory, "master")
pcap2 = DatasetSniffer('en0', directory, "slave")
# Start new Threads
pcap1.start()
pcap2.start()
# Do stuff
time.sleep(10)
# Finished doing stuff
DatasetSnifferExit = True