使用多处理时未更新实例变量 Python

Instance variables not being updated Python when using Multiprocessing

我在更新变量方面遇到了一个不寻常的问题。我构建了一个简单的 class 对象来帮助我进行一些网络嗅探。我想做一个并行过程,它允许我 运行 一些网络测试并捕获使用 python 生成的流量,这样我就可以扩展程序来做一些令人惊奇的事情。我正在使用 scapy 的嗅探功能来帮助进行界面嗅探。

Scapy 的嗅探器允许您将一个函数传递给它自己的函数,该函数允许您创建一个 'stop sniffing' 条件。在我的例子中,我创建了函数 stop_filter 并且我希望通过简单地更新 self.stop_sniffing 实例变量来停止 Scapy 嗅探函数。我在下面展示了程序输出,它显示 self.stop_sniffing 在函数 stop 中设置为 True,但在 stop_filter 中打印时又设置回 False(或根本不更新) ].我不知道为什么会这样,也没有想到解决方案,因为这是一个很奇怪的问题。

如果有新眼光的人能看到我在这里做了什么疯狂的事情,将不胜感激!

from scapy.all import *
from multiprocessing import Process


class DatasetSniffer:
    def __init__(self, iface, local_dir='.'):
        self.iface = iface
        self.master = None
        self.local_dir = local_dir
        self.stop_sniffing = False # Never updates! why!?
        self.writer = PcapWriter(local_dir+"/master.pcap", append=True, sync=True)

    def stop_filter(self, p):
        # Note: 'p' gets passed in by Scapy function 'sniff'
        print self.stop_sniffing
        # Return 'True' to stop sniffer
        return self.stop_sniffing

    def sniff(self):
        sniff(store=0, prn=self.writer.write, iface=self.iface, stop_filter=self.stop_filter)

    def start(self):
        self.master = Process(target=self.sniff)
        self.master.start()

    def stop(self):
        self.stop_sniffing = True
        # Shows that self.stop_sniffing is 'True'
        print self.stop_sniffing
        self.master.join()


if __name__ == "__main__":
    interface = 'en3'
    sniffer = DatasetSniffer(interface)
    sniffer.start()
    #   some process
    time.sleep(5)
    sniffer.stop()

Shell 输出:

sudo python sniffing.py
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
False
True
False
False
False
False

问题

您在此示例代码中没有使用多个线程,您使用的是多个 进程

这里有两个不共享内存的独立进程:

  • 原流程
  • 一个新进程,由 multiprocessing.Process.start

    启动
    • 此进程将通过分叉原始进程开始,在分叉时创建其内存的副本。他们没有 "share" 记忆。

现在,当您在原始进程中调用 DatasetSniffer.stop 时,这不会改变新 ("master") 进程中 stop_sniffing 的值。

那怎么沟通呢?

使用multiprocessing时,您可以使用Pipe进行交流。像这样:

readable_pipe, writable_pipe = multiprocessing.Pipe(duplex=False)
process = Process(target=do_something)

现在,我们的原始进程可以通过写入管道来发送消息:

writable_pipe.send("stop")

而新进程可以使用以下方式检查消息:

if readable_pipe.poll():
    msg = readable_pipe.recv()

尝试将其应用到您的代码中。

感谢您的所有建议。经过一杯灵感,我设法敲定了这个剧本。可能是一种更好的方法来解决我的问题而无需进行太多更改。所以这段代码允许线程在 class 之外使用停止函数,从而允许所有异步任务使用 stop_filter.

在下面的 link 中找到此信息。希望这个 post 对其他人有用! http://www.tutorialspoint.com/python/python_multithreading.htm

干杯!

import threading
from scapy.all import *
from datetime import datetime

directory = str(datetime.now().strftime("%Y%m%d%H%M%S"))
os.makedirs(directory)

DatasetSnifferExit = 0

class DatasetSniffer(threading.Thread):
    def __init__(self, iface, local_dir='.', filename=str(datetime.now())):
        self.iface = iface
        self.filename = filename
        self.local_dir = local_dir
        self.stop_sniffing = False
        self.writer = PcapWriter(local_dir+"/"+filename+".pcap", append=True, sync=True)
        threading.Thread.__init__(self)

    def run(self):
        sniff_interface(self.writer.write, self.iface)


def stop_filter(p):
    if DatasetSnifferExit:
        return True
    else:
        return False


def sniff_interface(write, iface):
    sniff(store=0, prn=write, iface=iface, stop_filter=stop_filter)


if __name__ == "__main__":
    DatasetSnifferExit = False
    # Create new threads
    pcap1 = DatasetSniffer('en3', directory, "master")
    pcap2 = DatasetSniffer('en0', directory, "slave")

    # Start new Threads
    pcap1.start()
    pcap2.start()

    # Do stuff
    time.sleep(10)

    # Finished doing stuff
    DatasetSnifferExit = True