如何使用 Twisted 嗅探网络接口?

How to sniff a network interface with Twisted?

我需要在 Twisted 代码中从网络接口接收原始数据包。数据包不会有正确的 IP 或 MAC 地址,也不是有效的 headers,所以我需要原始的东西。

我已经尝试研究 twisted.pair,但我无法弄清楚如何使用它来获取原始界面。

通常,我会使用scapy.all.sniff。但是,这是阻塞的,所以我不能只将它与 Twisted 一起使用。 (我也不能使用 scapy.all.sniff 超时和 busy-loop,因为我不想丢失数据包。)

一个可能的解决方案是在线程中 运行 scapy.all.sniff 并在我收到数据包时以某种方式回调 Twisted。这似乎有点不雅(而且,我不知道该怎么做,因为我是一个 Twisted 初学者),但如果我找不到更好的东西,我可能会满足于此。

您可以 运行 分布式系统并通过中央排队系统传递数据。采用 Unix 哲学并创建一个单一的应用程序来完成一些任务并把它们做好。创建一个嗅探数据包的应用程序(你可以在这里使用 scapy 因为如果你阻止任何东西并不重要)然后将它们发送到队列(RabitMQ,Redis,SQS 等)并让另一个应用程序处理来自队列的数据包。这种方法应该让你头疼最少。

如果您需要 运行 单个应用程序中的所有内容,那么 threads/multiprocessing 是唯一的选择。但是您需要遵循一些设计模式。您还可以将以下代码分解为单独的函数并使用专用的排队系统。

from threading import Thread
from time import sleep
from twisted.internet import defer, reactor

class Sniffer(Thread):
    def __init__(self, _reactor, shared_queue):
        super().__init__()
        self.reactor = _reactor
        self.shared_queue = shared_queue

    def run(self):
        """
        Sniffer logic here
        """
        while True:
            self.reactor.callFromThread(self.shared_queue.put, 'hello world')
            sleep(5)

@defer.inlineCallbacks
def consume_from_queue(_id, _reactor, shared_queue):
    item = yield shared_queue.get()
    print(str(_id), item)
    _reactor.callLater(0, consume_from_queue, _id, _reactor, shared_queue)

def main():
    shared_queue = defer.DeferredQueue()
    sniffer = Sniffer(reactor, shared_queue)
    sniffer.daemon = True
    sniffer.start()

    workers = 4
    for i in range(workers):
        consume_from_queue(i+1, reactor, shared_queue)

    reactor.run()

main()

Sniffer class 在 Twisted 的控制之外开始。注意 sniffer.daemon = True,这是为了让线程在主线程停止时停止。如果它被设置为 False(默认值),那么只有当所有线程都结束时,应用程序才会退出。根据手头的任务,这可能总是可行,也可能不总是可行。如果您可以从嗅探中休息一下以检查线程事件,那么您可能能够以更安全的方式停止线程。

self.reactor.callFromThread(self.shared_queue.put, 'hello world') 是必需的,以便将项目放入队列中发生在主反应器线程中,而不是 Sniffer 执行的线程中。这样做的主要好处是来自线程的消息会有某种同步(假设您计划扩展到嗅探多个接口)。另外,我不确定 DeferredQueue 对象是线程安全的 :) 我把它们当作它们不是线程安全的。

由于在这种情况下 Twisted 不管理线程,因此由开发人员管理是至关重要的。注意 worker 循环和 consume_from_queue(i+1, reactor, shared_queue)。这个循环确保只有所需数量的工人在处理任务。在 consume_from_queue() 函数中,shared_queue.get() 将等待(非阻塞)直到一个项目被放入队列,打印该项目,然后安排另一个 consume_from_queue().