如何使用 Twisted 嗅探网络接口?
How to sniff a network interface with Twisted?
我需要在 Twisted 代码中从网络接口接收原始数据包。数据包不会有正确的 IP 或 MAC 地址,也不是有效的 headers,所以我需要原始的东西。
我已经尝试研究 twisted.pair
,但我无法弄清楚如何使用它来获取原始界面。
通常,我会使用scapy.all.sniff
。但是,这是阻塞的,所以我不能只将它与 Twisted 一起使用。 (我也不能使用 scapy.all.sniff
超时和 busy-loop,因为我不想丢失数据包。)
一个可能的解决方案是在线程中 运行 scapy.all.sniff
并在我收到数据包时以某种方式回调 Twisted。这似乎有点不雅(而且,我不知道该怎么做,因为我是一个 Twisted 初学者),但如果我找不到更好的东西,我可能会满足于此。
您可以 运行 分布式系统并通过中央排队系统传递数据。采用 Unix 哲学并创建一个单一的应用程序来完成一些任务并把它们做好。创建一个嗅探数据包的应用程序(你可以在这里使用 scapy
因为如果你阻止任何东西并不重要)然后将它们发送到队列(RabitMQ,Redis,SQS 等)并让另一个应用程序处理来自队列的数据包。这种方法应该让你头疼最少。
如果您需要 运行 单个应用程序中的所有内容,那么 threads/multiprocessing 是唯一的选择。但是您需要遵循一些设计模式。您还可以将以下代码分解为单独的函数并使用专用的排队系统。
from threading import Thread
from time import sleep
from twisted.internet import defer, reactor
class Sniffer(Thread):
def __init__(self, _reactor, shared_queue):
super().__init__()
self.reactor = _reactor
self.shared_queue = shared_queue
def run(self):
"""
Sniffer logic here
"""
while True:
self.reactor.callFromThread(self.shared_queue.put, 'hello world')
sleep(5)
@defer.inlineCallbacks
def consume_from_queue(_id, _reactor, shared_queue):
item = yield shared_queue.get()
print(str(_id), item)
_reactor.callLater(0, consume_from_queue, _id, _reactor, shared_queue)
def main():
shared_queue = defer.DeferredQueue()
sniffer = Sniffer(reactor, shared_queue)
sniffer.daemon = True
sniffer.start()
workers = 4
for i in range(workers):
consume_from_queue(i+1, reactor, shared_queue)
reactor.run()
main()
Sniffer
class 在 Twisted 的控制之外开始。注意 sniffer.daemon = True
,这是为了让线程在主线程停止时停止。如果它被设置为 False
(默认值),那么只有当所有线程都结束时,应用程序才会退出。根据手头的任务,这可能总是可行,也可能不总是可行。如果您可以从嗅探中休息一下以检查线程事件,那么您可能能够以更安全的方式停止线程。
self.reactor.callFromThread(self.shared_queue.put, 'hello world')
是必需的,以便将项目放入队列中发生在主反应器线程中,而不是 Sniffer
执行的线程中。这样做的主要好处是来自线程的消息会有某种同步(假设您计划扩展到嗅探多个接口)。另外,我不确定 DeferredQueue
对象是线程安全的 :) 我把它们当作它们不是线程安全的。
由于在这种情况下 Twisted 不管理线程,因此由开发人员管理是至关重要的。注意 worker
循环和 consume_from_queue(i+1, reactor, shared_queue)
。这个循环确保只有所需数量的工人在处理任务。在 consume_from_queue()
函数中,shared_queue.get()
将等待(非阻塞)直到一个项目被放入队列,打印该项目,然后安排另一个 consume_from_queue()
.
我需要在 Twisted 代码中从网络接口接收原始数据包。数据包不会有正确的 IP 或 MAC 地址,也不是有效的 headers,所以我需要原始的东西。
我已经尝试研究 twisted.pair
,但我无法弄清楚如何使用它来获取原始界面。
通常,我会使用scapy.all.sniff
。但是,这是阻塞的,所以我不能只将它与 Twisted 一起使用。 (我也不能使用 scapy.all.sniff
超时和 busy-loop,因为我不想丢失数据包。)
一个可能的解决方案是在线程中 运行 scapy.all.sniff
并在我收到数据包时以某种方式回调 Twisted。这似乎有点不雅(而且,我不知道该怎么做,因为我是一个 Twisted 初学者),但如果我找不到更好的东西,我可能会满足于此。
您可以 运行 分布式系统并通过中央排队系统传递数据。采用 Unix 哲学并创建一个单一的应用程序来完成一些任务并把它们做好。创建一个嗅探数据包的应用程序(你可以在这里使用 scapy
因为如果你阻止任何东西并不重要)然后将它们发送到队列(RabitMQ,Redis,SQS 等)并让另一个应用程序处理来自队列的数据包。这种方法应该让你头疼最少。
如果您需要 运行 单个应用程序中的所有内容,那么 threads/multiprocessing 是唯一的选择。但是您需要遵循一些设计模式。您还可以将以下代码分解为单独的函数并使用专用的排队系统。
from threading import Thread
from time import sleep
from twisted.internet import defer, reactor
class Sniffer(Thread):
def __init__(self, _reactor, shared_queue):
super().__init__()
self.reactor = _reactor
self.shared_queue = shared_queue
def run(self):
"""
Sniffer logic here
"""
while True:
self.reactor.callFromThread(self.shared_queue.put, 'hello world')
sleep(5)
@defer.inlineCallbacks
def consume_from_queue(_id, _reactor, shared_queue):
item = yield shared_queue.get()
print(str(_id), item)
_reactor.callLater(0, consume_from_queue, _id, _reactor, shared_queue)
def main():
shared_queue = defer.DeferredQueue()
sniffer = Sniffer(reactor, shared_queue)
sniffer.daemon = True
sniffer.start()
workers = 4
for i in range(workers):
consume_from_queue(i+1, reactor, shared_queue)
reactor.run()
main()
Sniffer
class 在 Twisted 的控制之外开始。注意 sniffer.daemon = True
,这是为了让线程在主线程停止时停止。如果它被设置为 False
(默认值),那么只有当所有线程都结束时,应用程序才会退出。根据手头的任务,这可能总是可行,也可能不总是可行。如果您可以从嗅探中休息一下以检查线程事件,那么您可能能够以更安全的方式停止线程。
self.reactor.callFromThread(self.shared_queue.put, 'hello world')
是必需的,以便将项目放入队列中发生在主反应器线程中,而不是 Sniffer
执行的线程中。这样做的主要好处是来自线程的消息会有某种同步(假设您计划扩展到嗅探多个接口)。另外,我不确定 DeferredQueue
对象是线程安全的 :) 我把它们当作它们不是线程安全的。
由于在这种情况下 Twisted 不管理线程,因此由开发人员管理是至关重要的。注意 worker
循环和 consume_from_queue(i+1, reactor, shared_queue)
。这个循环确保只有所需数量的工人在处理任务。在 consume_from_queue()
函数中,shared_queue.get()
将等待(非阻塞)直到一个项目被放入队列,打印该项目,然后安排另一个 consume_from_queue()
.