Skip to content

Struct Versioning

What is Struct Versioning

If a binary file format changes over the course of development, you can version the structure to conditionally parse or skip parsing certain properties, which is useful for maintaining parser compatibility with older files. When coming up with a new struct for a new project, it is highly recommended that some sort of versioning information be included at the start of the file itself as a future proofing measure. For the sake of this tutorial, we'll look at how we'd modify the PcapFile definition from the previous tutorial to include some extra fields conditionally, based on the magic_number property

The Modified PCAP

One variation of the PCAP file is where the header includes the following extra information:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class PcapHeader(BaseStruct):
    # @formatter:off
    magic_number: bytes     = Retriever(Bytes[4], default = b"\xa1\xb2\xc3\xd4")
    version_major: int      = Retriever(u16,      default = 2)
    version_minor: int      = Retriever(u16,      default = 4)
    timezone: int           = Retriever(u32,      default = 0)
    timestamp_accuracy: int = Retriever(u32,      default = 0)
    snap_length: int        = Retriever(u32,      default = 0)
    link_layer_type: int    = Retriever(u32,      default = 1)

    # new fields:
    interface_index: int    = Retriever(u32,       default = 0)
    protocol: int           = Retriever(u16,       default = 0)
    packet_type: int        = Retriever(u8,        default = 0)
    padding: bytes          = Retriever(Bytes[1],  default = b"\x00")
    # @formatter:on

These extra fields are included only if the magic_number property is b"\xa1\xb2\xcd\x34" (notice the last two bytes) instead of b"\xa1\xb2\xc3\xd4"

But if we just include the extra fields like this, it will break the parser for any files which do not contain that extra piece of information in the header. We need some way to signal to BFP that these should be [de]serialized conditionally. Let's specify a min_ver to tell BFP a lower bound on the struct version where these properties are included:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class PcapHeader(BaseStruct):
    # @formatter:off
    magic_number: bytes     = Retriever(Bytes[4],                             default = b"\xa1\xb2\xc3\xd4")
    version_major: int      = Retriever(u16,                                  default = 2)
    version_minor: int      = Retriever(u16,                                  default = 4)
    timezone: int           = Retriever(u32,                                  default = 0)
    timestamp_accuracy: int = Retriever(u32,                                  default = 0)
    snap_length: int        = Retriever(u32,                                  default = 0)
    link_layer_type: int    = Retriever(u32,                                  default = 1)

    interface_index: int    = Retriever(u32,      min_ver = Version(2, 4, 1), default = 0)
    protocol: int           = Retriever(u16,      min_ver = Version(2, 4, 1), default = 0)
    packet_type: int        = Retriever(u8,       min_ver = Version(2, 4, 1), default = 0)
    padding: bytes          = Retriever(Bytes[1], min_ver = Version(2, 4, 1), default = 0)
    # @formatter:on

v2.4.1

Such a version number does not actually exist, it was chosen arbitrarily since the real format uses the magic number for the purpose of versioning.

Maximum Versions

We can similarly also specify a max_ver which sets an upper bound on the struct version where these properties are included.

Determining the Struct Version During Deserialization

Now when we deserialize a file, we also need to set a struct version for BFP to be able to make use of the versioning information we just added above. This is done by overriding the _get_verison function in a struct:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
class PcapFile(BaseStruct):
    header: PcapHeader      = Retriever(PcapHeader,   default_factory = PcapHeader)
    packets: list[Packet]   = Retriever(Tail[Packet], default_factory = lambda _ver: [])

    @classmethod
    def _get_version(cls, stream: ByteStream, ver: Version = Version(0)):
        bytes_ = stream.peek(8)
        magic, ver_bytes = bytes_[:4], bytes_[4:]
        major, minor = u16.from_bytes(ver_bytes[:2]), u16.from_bytes(ver_bytes[2:])

        if major != 2 or minor != 4:
            raise VersionError(f"Unrecognised version v{major}.{minor} for Pcap file")

        if magic == b"\xd4\xc3\xb2\xa1":
            return Version(2, 4)
        if magic == b"\xcd\x34\xb2\xa1":
            return Version(2, 4, 1)

        raise VersionError(f"Unrecognised magic_number {magic[::-1]!r} for pcap file")

This function must determine a struct version from the ByteStream and return it, or raise an error. There are a few things to note:

  • We raise an error if we see any other version than v2.4 since that's the only one we know how to parse
  • We also raise an error if the magic number is not one of the ones we recognise
  • Arbitrarily assign a different higher version when magic is set to b"\xa1\xb2\xcd\x34". Note that since this file is stored in little endian and ByteStream returns raw bytes, we need to reverse the order of the bytes before making the comparison.
  • When a version is set this way, it is recursively also set for all sub-structs unless they also override _get_version for themselves
  • When you override the _get_version function in a sub struct, the second argument ver is given the version of the parent struct

peek vs get

As a rule of thumb, you should never consume any bytes from the stream in _get_version, always use stream.peek(n) over stream.get(n)

With this, we're done! Now you'll be able to [de]serialize PCAP files with the special magic_number! Notably, you can still [de]serialize the standard PCAP file as well, since the extra fields in the PcapHeader are only [de]serialized when the special magic_number is present.

You can check if you can still read the old format:

1
2
test = PcapFile.from_file(r"ipv4frags.pcap")
print(test.ver) # prints v2.4

Additionally, if you tried accessing one of the extra fields in a version where it is not supported, you'll be met with an error:

1
print(test.header.protocol)

Yields:

1
2
3
4
Traceback (most recent call last):
  File "/path/to/code.py", line 63, in <module>
    print(test.header.protocol)
errors.VersionError: 'protocol' is not supported in struct version v2.4

Default Initialization

While the definition above can properly [de]serialize data from existing files, it will not be able to correctly create default instances with the new version of the struct. To properly support default initialization, we must also tell BFP which version to use for this struct when it is default initialized. This can be done by overriding the __new__ function in a struct:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
from typing import cast

class PcapFile(BaseStruct):
    header: PcapHeader      = Retriever(PcapHeader,   default_factory = PcapHeader)
    packets: list[Packet]   = Retriever(Tail[Packet], default_factory = lambda _ver: [])

    @classmethod
    def _get_version(cls, stream: ByteStream, ver: Version = Version(0)):
        ...

    def __new__(cls, ver: Version = Version(2, 4), init_defaults: bool = True, **retriever_inits):
        self = cast(PcapFile, super().__new__(cls, ver, init_defaults, **retriever_inits))
        if ver == Version(2, 4, 1):
            self.header.magic_number = b"\xa1\xb2\xcd\x34"
        return self

Let's break down what's going on here:

  1. We're changing the default of ver to Version(2, 4) in the constructor. This means when someone creates a default instance using PcapFile(), it's version will be initialized to v2.4
  2. Since we want people to be able to create defaults of a different version by using PcapFile(ver = Version(2, 4, 1)), we need to fix the magic_number manually so that it is serialized correctly when we write this new instance to a file.

Different Defaults Across Versions

You can use min_ver and max_ver to make it so that a different property is used for the default initialization in each version, which allows specifying a different default:

1
2
3
4
class PcapHeader(BaseStruct):
    # @formatter:off
    magic_number_24: bytes      = Retriever(Bytes[4],                             max_ver = Version(2, 4)   default = b"\xa1\xb2\xc3\xd4")
    magic_number_241: bytes     = Retriever(Bytes[4], min_ver = Version(2, 4, 1)                            default = b"\xa1\xb2\xcd\x34")
This means that we no longer need to have special logic in the constructor to fix it for us, but it has the downside that it is more awkward to use, since the property you need to access now changes depending on the file version - BFP offers a way to combine such properties into one using a RetrieverCombiner. Using these is covered in the Advanced Retrievers section of this tutorial.

Note that the choice to implement correct defaults for different versions with different properites or with special logic in the constructor is up to the struct designer, neither method is preferred over the other by BFP itself.

PCAP Variants

There are more variations of the PCAP format based different magic_numbers. Some very old files may also have a version number of 2.3 or even 2.2. You can read about other PCAP file variations here. Implementing a parser that can work with all these variations is left as an exercise for the reader.

The Code

Here's the completed code in all it's glory:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from typing import cast

from bfp_rs import BaseStruct, Retriever, Version, ByteStream, ret
from bfp_rs.combinators import set_repeat
from bfp_rs.errors import VersionError
from bfp_rs.types.le import Bytes, u16, u32, Tail, u8


class Packet(BaseStruct):
    # @formatter:off
    timestamp_seconds: int       = Retriever(u32,      default = 0)
    timestamp_micro_seconds: int = Retriever(u32,      default = 0)
    captured_length: int         = Retriever(u32,      default = 0, on_read = lambda: [set_repeat(ret(Packet.data)).from_(Packet.captured_length)])
    original_length: int         = Retriever(u32,      default = 0)
    data: list[bytes]            = Retriever(Bytes[1], default = b"\x00")
    # @formatter:on

class PcapHeader(BaseStruct):
    # @formatter:off
    magic_number: bytes     = Retriever(Bytes[4],                                   default = b"\xa1\xb2\xc3\xd4")
    version_major: int      = Retriever(u16,                                        default = 2)
    version_minor: int      = Retriever(u16,                                        default = 4)
    timezone: int           = Retriever(u32,                                        default = 0)
    timestamp_accuracy: int = Retriever(u32,                                        default = 0)
    snap_length: int        = Retriever(u32,                                        default = 0)
    link_layer_type: int    = Retriever(u32,                                        default = 1)

    interface_index: int    = Retriever(u32,      min_ver = Version(2, 4, 1), default = 0)
    protocol: int           = Retriever(u16,      min_ver = Version(2, 4, 1), default = 0)
    packet_type: int        = Retriever(u8,       min_ver = Version(2, 4, 1), default = 0)
    padding: bytes          = Retriever(Bytes[1], min_ver = Version(2, 4, 1), default = b"\x00")
    # @formatter:on

class PcapFile(BaseStruct):
    header: PcapHeader      = Retriever(PcapHeader,   default_factory = PcapHeader)
    packets: list[Packet]   = Retriever(Tail[Packet], default_factory = lambda _ver: [])

    @classmethod
    def _get_version(cls, stream: ByteStream, ver: Version = Version(0)) -> Version:
        bytes_ = stream.peek(8)
        magic, ver_bytes = bytes_[:4], bytes_[4:]
        major, minor = u16.from_bytes(ver_bytes[:2]), u16.from_bytes(ver_bytes[2:])

        if major != 2 or minor != 4:
            raise VersionError(f"Unrecognised version v{major}.{minor} for Pcap file")

        if magic == b"\xd4\xc3\xb2\xa1":
            return Version(2, 4)
        if magic == b"\xcd\x34\xb2\xa1":
            return Version(2, 4, 1)

        raise VersionError(f"Unrecognised magic_number {magic[::-1]!r} for pcap file")

    def __new__(cls, ver: Version = Version(2, 4), init_defaults: bool = True, **retriever_inits):
        self = cast(PcapFile, super().__new__(cls, ver, init_defaults, **retriever_inits))
        if ver == Version(2, 4, 1):
            self.header.magic_number = b"\xa1\xb2\xcd\x34"
        return self

test = PcapFile.from_file(r"ipv4frags.pcap")
print(test.ver)

print(test.header.protocol)