scapy.all.sniff incorrectly adds 4 bytes of additional data to the captured UDP payload.
Using a socket to receive the data, or capturing with Wireshark, these 4 bytes do not appear.
Here is a full test case to reproduce (test with 2 interfaces connected by a simple cable):
#!/usr/bin/env python3
import contextlib
import os
import socket
import threading
import unittest
import scapy.all
ITF_SRC = "eth6"
ITF_DST = "eth5"
IP_SRC = "2014::1"
IP_DST = "2015::1"
UDP_PORT = 500
class SniffThread(threading.Thread):
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self.received = []
super().__init__()
def run(self):
self.received.extend(scapy.all.sniff(*self.args, **self.kwargs))
class TestScapyUdpCaptureIssue(unittest.TestCase):
def test_captureSocket(self):
for udp_payload_size in (25, 24):
# build udp frame
eth = scapy.all.Ether(src=scapy.all.get_if_hwaddr(ITF_SRC),
dst=scapy.all.get_if_hwaddr(ITF_DST))
ip = scapy.all.IPv6(src=IP_SRC,
dst=IP_DST)
udp = scapy.all.UDP(sport=UDP_PORT, dport=UDP_PORT)
payload_udp = os.urandom(udp_payload_size)
frame = eth / ip / udp / payload_udp
# listen for future response
with contextlib.closing(socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)) as sckt_rec:
sckt_rec.settimeout(1)
sckt_rec.bind((IP_DST, UDP_PORT))
# send frame
scapy.all.sendp(frame, iface=ITF_SRC, verbose=False)
# check response
try:
received, address = sckt_rec.recvfrom(16384)
except socket.timeout as e:
self.fail("%s: %s (udp_payload_size = %u)" % (e.__class__.__name__, e, udp_payload_size))
self.assertEqual(address, (IP_SRC, UDP_PORT, 0, 0), "udp_payload_size = %u" % (udp_payload_size))
self.assertSequenceEqual(received, payload_udp, "udp_payload_size = %u" % (udp_payload_size))
def test_captureSniff(self):
for udp_payload_size in (25, 24):
# build udp frame
eth = scapy.all.Ether(src=scapy.all.get_if_hwaddr(ITF_SRC),
dst=scapy.all.get_if_hwaddr(ITF_DST))
ip = scapy.all.IPv6(src=IP_SRC,
dst=IP_DST)
udp = scapy.all.UDP(sport=UDP_PORT, dport=UDP_PORT)
payload_udp = os.urandom(udp_payload_size)
frame = eth / ip / udp / payload_udp
# listen for future response
ipv6nh_codes = dict(zip(scapy.all.ipv6nh.values(), scapy.all.ipv6nh.keys()))
sniff_thread = SniffThread(timeout=1,
iface=ITF_DST,
lfilter=lambda x: x[scapy.all.IPv6].nh not in (ipv6nh_codes["ICMPv6"],
ipv6nh_codes["Hop-by-Hop Option Header"]))
sniff_thread.start()
# send frame
scapy.all.sendp(frame, iface=ITF_SRC, verbose=False)
sniff_thread.join()
# build expected response
expected_eth = scapy.all.Ether(src=scapy.all.get_if_hwaddr(ITF_SRC),
dst=scapy.all.get_if_hwaddr(ITF_DST))
expected_ip = scapy.all.IPv6(src=IP_SRC, dst=IP_DST)
expected_udp = scapy.all.UDP(sport=UDP_PORT, dport=UDP_PORT)
expected = expected_eth / expected_ip / expected_udp / payload_udp
# check response
self.assertEqual(len(sniff_thread.received), 1, "udp_payload_size = %u" % (udp_payload_size)) # fails if udp_payload_size == 24
self.assertSequenceEqual(bytes(sniff_thread.received[0]),
bytes(expected),
"udp_payload_size = %u" % (udp_payload_size))
if __name__ == "__main__":
unittest.main()