gpt4 book ai didi

python - 多线程Python程序中高效的抓包方式

转载 作者:太空宇宙 更新时间:2023-11-04 06:05:53 27 4
gpt4 key购买 nike

我正在使用 Python 实现嗅探器。我需要所有数据包详细信息(VLAN 等),所以我使用的是 RAW 套接字。在我所有的以太网接口(interface)上都有几个嗅探器作为恶魔工作,所以每个都在不同的线程中。

除了我添加数据包的全局变量(如以下代码所示)之外,是否有更好的方法在我的主线程中获取结果?

我尝试了 Queue,但除了增加了复杂性之外没有发现任何特别的好处。

import socket, threading

def ReceivePackets():
soc = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(0x0003))
while 1:
packet = soc.recv(2000)
buffer.append(packet)

def ReadSniffer():
result = list(buffer)
#Clear buffer to start sniffing from scratch after reading
del buffer[:]
return result

我在程序开始时在所有以太网接口(interface)上启动嗅探器,并在需要数据时读取相应的全局缓冲区。

buffer = []
t = threading.Thread(target = ReceivePackets)
t.daemon = True
t.start()
...
packets = ReadSniffer()

最佳答案

对于套接字,我总是得到不一致的结果,因为我有各种数据包大小不同的数据包。套接字可能会捕获我发送几次的所有数据包,但最终它会丢失一些数据包。

我将 pcapy 接口(interface)转移到 libcap 库而不是原始套接字,它可以完美且非常可靠地工作。

我将 Sniffer 类实现为 threading.Thread 的子类,因此它在一个单独的线程中启动。实现是这样的:

class Sniffer(threading.Thread):

def __init__(self, port):
...
threading.Thread.__init__(self)
#Packets will be stored here
self.result = []
#Not worry about closing the sniffer thread
self.daemon = True

#Invoked when I start my thread
def run(self):
max_bytes = 16000
#Important to capture broken packets as I need them
promiscuous = True
read_timeout = 100
pc = pcapy.open_live(str(self.port), max_bytes, promiscuous, read_timeout)
pc.loop(-1, self.recv_pkts)

#Do when the packet arrives
def recv_pkts(self, hdr, packet):
packetHex = binascii.hexlify(packet)
self.result.append(packetHex)
return self.result

要与主程序并行启动嗅探器线程:

 sniffer1 = Sniffer(eth1)
#Start separate thread
sniffer1.start()

关于python - 多线程Python程序中高效的抓包方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22102189/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com