Eigentlich sollte es dank Protobuf heute nicht mehr vorkommen, dass binäre Daten manuell serialisiert werden. „Eigentlich“. Ich musste im folgenden Beispiel Binärdaten dekodieren, die von Fahrzeugsensoren in Dateien geschrieben wurden. Zum Glück lag eine Beschreibung der Kodierung vor. Die Herausforderung war, dass die Daten aus verschiedenen Sensoren übermittelt wurden und alle im Moment ihres Eintreffens in eine Binärdatei geschrieben wurden. Welcher Sensor nun gerade einen Eintrag schrieb, musste anhand des führenden Bytes ermittelt werden.
Weiterer Glücksfall: zwischen den verschiedenen Einträgen gab es zwei wiederkehrende Begrenzungsbytes. Eine Nachricht eines Sensors folgt damit folgendem Schema:
<Begrenzungsbyte><Begrenzungsbyte><Identifikationsbyte><Informationsbytes unterschiedlicher Länge>
Um das Ganze weiter zu würzen, lieferten einige Sensoren statt binärer Daten auch ASCII Zeichen – sprich puren Text.
Python Dictionaries
Die Entscheidung, das Problem mit Python zu lösen war einfach. Zum einen lassen sich Binärdaten mit Python sehr einfach dekodieren. Des Weiteren bietet Python mit seinen Dictionaries die notwendigen Mittel, die richtigen Sensorinformationen zu „jagen“.
Das notwendige Python Modul zur De-serialisierung von Binärdaten trägt den Namen struct
. Mit diesem Modul lassen sich die einzelnen Bytes entsprechend ihres Typs dekodieren. Jeder Datentyp wird durch ein entsprechendes Formatzeichen repräsentiert. Entspricht das gelesene Byte beispielsweise einem Textzeichen (Character), kann es in struct durch das Formatzeichen c repräsentiert werden. Integerwerte werden durch das i, vier Byte durch ein l (für Long) repräsentiert und so weiter. Sollen beispielsweise sechs Bytes dekodiert werden, von denen die ersten beiden Zeichen darstellen und die letzten vier ein Integer, lässt sich das im struct Modul folgendermaßen repräsentieren:
cci
Die komplette Liste der Formate ist in der Dokumentation zum Modul zu finden. Zur Konvertierung der Bytes muss die Byte Order berücksichtigt werden. Die Byte Reihenfolge oder Endianess gibt an, welches Byte zuerst gespeichert wird. In Schnittstellen ist dies häufig das Höchstwertige, in welchem Fall man von einer Big-Endian Reihenfolge spricht. Bei einer unkonventionellen Schnittstelle können jedoch auch Little-Endian oder beide Typen gemeinsam vorkommen. Dies ist der Dokumentation zu entnehmen. Liegt diese nicht vor, ist ausprobieren angesagt. Im struct Modul von Python kann die Endianess mittels Zeiger vor den Formaten definiert werden. Die im vorliegenden Beispiel vorkommenden Bytesequenzen wurden als Python Dictionaries definiert. Zur Illustration anbei Beispiele:
>H6h # 14 big-endian Bytes. Das erste ist unsigned Short (2 Bytes), es folgen 6 Short >H4c # 6 big-endian Bytes. Ein unsigned Short, 4 Character <5ci8c2i # 25 little-endian Bytes: Fünf character, ein Integer (= vier Bytes), acht Character, zwei mal vier Bytes für zwei Integer. # Als Python Liste: struct_info = ['>H6h', '>H4c', '<5ci8c2i', '<5ci8c2i', '>H']
Jede Bytesequenz wurde durch ein Informationsbyte am Beginn der Sequenz als Identifikator gekennzeichnet. Somit ließ sich folgendes Dictionary erstellen:
msg_bytes = [2, 3, 5, 6, 7] struct_info = ['>H6h', '>H4c', '<5ci8c2i', '<5ci8c2i', '>H'] sdict = dict(zip(msg_bytes_bin, struct_info))
The Hunting of the Byte
Nun beginnt die Jagd nach den Bytes. Anhand der Begrenzungsbytes können die ankommenden Daten in Stücke zerschnitten werden, die dann entsprechend der Informationen in den Dictionaries dekodiert werden.
# Function to get chunk to process and message id def getchunk(bytedata, startposition = 0, header = b'\xaa\x55'): try: pos = bytedata.index(header, startposition) curr_chunk = bytedata[pos + len(header) : bytedata.index(header, pos + len(header))] if len(curr_chunk) == 0: curr_chunk = b' ' except ValueError as e: # print(e) curr_chunk = b' ' return (curr_chunk) # Durch das File lesen with open(f, mode = 'rb') as hx: hx_data = hx.read() hx.close() pos = 0 while pos <= len(hx_data): infos = [f, pos] chunk = getchunk(hx_data, startposition = pos) pos = pos + len(chunk) hdat = list(struct.unpack(av_dict[chunk[0]], chunk[1:]))
In der Funktion getchunk werden alle Bytes sequentiell ermittelt, die zwischen zwei Begrenzungsbytes zu finden sind. Des weiteren wird ein Counter hochgezählt, so dass die Position im gesamten File jederzeit bekannt ist. Einige Funktionen, die für Strings verwendet werden, können in derselben Weise auch für Bytes angewandt werden (siehe z.B. index
, Zeile 4).
Im Iterator ab Zeile 18 wird jedes Teilstück durchgearbeitet. Anhand des ersten Bytes in jedem Teilstück kann aus dem Dictionary die korrekte Dekodierungssequenz ermittelt werden. Das Ergebnis (hdat) kann dann weggeschrieben werden (hier nicht dargestellt).
Ein guter Einstieg in die Arbeit mit Binärdaten (Python2 basiert), bietet diese Programmbeschreibung auf der Seite des Data Visualization Labs.
[…] Dieser Beitrag findet sich jetzt unter http://blog.dsquare.de/binaerdaten-mit-python-3-auslesen/ […]