From ca6f05ffedc22185046f574452f4cca5669c8337 Mon Sep 17 00:00:00 2001 From: Bastian Schroll Date: Sun, 7 Jan 2018 14:09:40 +0100 Subject: [PATCH] add decoders and tests --- boswatch/decoder/decoder.py | 52 +++++++++++++++++++ boswatch/decoder/fms.py | 74 ++++++++++++++++++++++++++ boswatch/decoder/pocsag.py | 83 ++++++++++++++++++++++++++++++ boswatch/decoder/zvei.py | 68 ++++++++++++++++++++++++ test/test_decoder.py | 100 ++++++++++++++++++++++++++++++++++++ 5 files changed, 377 insertions(+) create mode 100644 boswatch/decoder/decoder.py create mode 100644 boswatch/decoder/fms.py create mode 100644 boswatch/decoder/pocsag.py create mode 100644 boswatch/decoder/zvei.py create mode 100644 test/test_decoder.py diff --git a/boswatch/decoder/decoder.py b/boswatch/decoder/decoder.py new file mode 100644 index 0000000..54de6c3 --- /dev/null +++ b/boswatch/decoder/decoder.py @@ -0,0 +1,52 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: decoder.py +@date: 06.01.2018 +@author: Bastian Schroll +@description: Some utils for the decoding +""" +import logging + +from boswatch.decoder.fms import Fms +from boswatch.decoder.pocsag import Pocsag +from boswatch.decoder.zvei import Zvei + +logging.debug("- %s loaded", __name__) + + +def getDecoder(data): + """!Choose the right decoder and return the new decoder object + + @param data: data to decode + @return Decoder object""" + logging.debug("search decoder") + if "FMS" in data: + return Fms() + elif "POCSAG" in data: + return Pocsag() + elif "ZVEI" in data: + return Zvei() + else: + logging.debug("no decoder found for: %s", data) + return DummyDecoder() + + +class DummyDecoder: + """!This dummy decoder class is needed because in case of + an getDecoder() with false data, we must return a decoder + object with an decode() method to prevent an error""" + def __init__(self): + pass + + @staticmethod + def decode(data): + return None diff --git a/boswatch/decoder/fms.py b/boswatch/decoder/fms.py new file mode 100644 index 0000000..8ae95d5 --- /dev/null +++ b/boswatch/decoder/fms.py @@ -0,0 +1,74 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: fms.py +@date: 06.01.2018 +@author: Bastian Schroll +@description: Decoder class for fms +""" + +import logging +import re + +from boswatch.packet import packet + +logging.debug("- %s loaded", __name__) + + +class Fms: + """!FMS decoder class + + This class decodes FMS data. + First step is to validate the data and check if the format is correct. + In the last step a valid BOSWatch packet is created and returned""" + + def __init__(self): + """!Create a new instance""" + logging.debug("FMS decoder started") + + def decode(self, data): + """!Decodes FMS + + @param data: FMS for decoding + @return BOSWatch FMS packet or None""" + if "CRC correct" in data: + service = data[19] + country = data[36] + location = data[61:63] + vehicle = data[72:76] + status = data[84] + direction = data[101] + directionText = data[103:110] + tacticalInfo = data[114:117] + fms_id = service + country + location + vehicle + status + direction + + if re.search("[0-9a-f]{8}[0-9a-f]{1}[01]{1}", fms_id): + logging.debug("found valid FMS") + + bwPacket = packet.Packet() + bwPacket.setField("mode", "fms") + bwPacket.setField("fms", fms_id) + bwPacket.setField("service", service) + bwPacket.setField("country", country) + bwPacket.setField("location", location) + bwPacket.setField("vehicle", vehicle) + bwPacket.setField("status", status) + bwPacket.setField("direction", direction) + bwPacket.setField("cirectionText", directionText) + bwPacket.setField("tacticalInfo", tacticalInfo) + + logging.debug(bwPacket) + return bwPacket + + logging.warning("no valid data") + return None + logging.warning("CRC Error") + return None diff --git a/boswatch/decoder/pocsag.py b/boswatch/decoder/pocsag.py new file mode 100644 index 0000000..2cc1d45 --- /dev/null +++ b/boswatch/decoder/pocsag.py @@ -0,0 +1,83 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: pocsag.py +@date: 06.01.2018 +@author: Bastian Schroll +@description: Decoder class for pocsag +""" + +import logging +import re + +from boswatch.packet import packet + +logging.debug("- %s loaded", __name__) + + +class Pocsag: + """!POCSAG decoder class + + This class decodes POCSAG data. + First step is to validate the data and check if the format is correct. + In the last step a valid BOSWatch packet is created and returned""" + + def __init__(self): + """!Create a new instance""" + logging.debug("POCSAG decoder started") + + def decode(self, data): + """!Decodes POCSAG + + @param data: POCSAG for decoding + @return BOSWatch POCSAG packet or None""" + bitrate = None + ric = None + subric = None + + if "POCSAG512:" in data: + bitrate = 512 + ric = data[20:27].replace(" ", "").zfill(7) + subric = str(int(data[39]) + 1) + + elif "POCSAG1200:" in data: + bitrate = 1200 + ric = data[21:28].replace(" ", "").zfill(7) + subric = str(int(data[40]) + 1) + + elif "POCSAG2400:" in data: + bitrate = 2400 + ric = data[21:28].replace(" ", "").zfill(7) + subric = str(int(data[40]) + 1) + + if re.search("[0-9]{7}", ric) and re.search("[1-4]{1}", subric): + if "Alpha:" in data: + message = data.split('Alpha: ')[1].strip() + message = message.replace('', '').replace('', '').replace('', '').replace('', '').strip() + else: + message = "" + subricText = subric.replace("1", "a").replace("2", "b").replace("3", "c").replace("4", "d") + + logging.debug("found valid POCSAG") + + bwPacket = packet.Packet() + bwPacket.setField("mode", "pocsag") + bwPacket.setField("bitrate", bitrate) + bwPacket.setField("ric", ric) + bwPacket.setField("subric", subric) + bwPacket.setField("subricText", subricText) + bwPacket.setField("message", message) + + logging.debug(bwPacket) + return bwPacket + + logging.warning("no valid data") + return None diff --git a/boswatch/decoder/zvei.py b/boswatch/decoder/zvei.py new file mode 100644 index 0000000..a9a467c --- /dev/null +++ b/boswatch/decoder/zvei.py @@ -0,0 +1,68 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: zvei.py +@date: 05.01.2018 +@author: Bastian Schroll +@description: Decoder class for zvei +""" + +import logging +import re + +from boswatch.packet import packet + +logging.debug("- %s loaded", __name__) + + +class Zvei: + """!ZVEI decoder class + + This class decodes ZVEI data. + First step is to validate the data and check if the format is correct. + After that the double-tone-sign 'E' is replaced. + In the last step a valid BOSWatch packet is created and returned""" + + def __init__(self): + """!Create a new instance""" + logging.debug("ZVEI decoder started") + + def decode(self, data): + """!Decodes ZVEI + + @param data: ZVEI for decoding + @return BOSWatch ZVEI packet or None""" + if re.search("[0-9E]{5}", data[7:12]): + logging.debug("found valid ZVEI") + + bwPacket = packet.Packet() + bwPacket.setField("mode", "zvei") + bwPacket.setField("zvei", self._solveDoubleTone(data[7:12])) + + logging.debug(bwPacket) + return bwPacket + + logging.warning("no valid data") + return None + + @staticmethod + def _solveDoubleTone(data): + """!Remove the doubleTone sign (here its the 'E') + + @param data: ZVEI for double tone sign replacement + @return Double Tone replaced ZVEI""" + if "E" in data: + data_old = data + for i in range(1, len(data)): + if data[i] == "E": + data = data.replace("E", data[i - 1], 1) + logging.debug("solve doubleTone: %s -> %s", data_old, data) + return data diff --git a/test/test_decoder.py b/test/test_decoder.py new file mode 100644 index 0000000..5aba2dd --- /dev/null +++ b/test/test_decoder.py @@ -0,0 +1,100 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +"""! + ____ ____ ______ __ __ __ _____ + / __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ / + / __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ < + / /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ / +/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/ + German BOS Information Script + by Bastian Schroll + +@file: test_decoder.py +@date: 15.12.2017 +@author: Bastian Schroll +@description: Unittests for BOSWatch. File must be run as "pytest" unittest +""" + +from boswatch.decoder import decoder + + +class Test_decoder: + """!Unittests for the decoder""" + + def test_decoder_no_data(self): + """!Test a empty string""" + assert decoder.getDecoder("").decode("") is None + + def test_decoder_zvei_valid(self): + """!Test valid ZVEI""" + dec = decoder.getDecoder("ZVEI") + assert not dec.decode("ZVEI1: 12345") is None + assert not dec.decode("ZVEI1: 12838") is None + assert not dec.decode("ZVEI1: 34675") is None + + def test_decoder_zvei_doubleTone(self): + """!Test doubleTone included ZVEI""" + dec = decoder.getDecoder("ZVEI") + assert not dec.decode("ZVEI1: 6E789") is None + assert not dec.decode("ZVEI1: 975E7") is None + assert not dec.decode("ZVEI1: 2E87E") is None + + def test_decoder_zvei_invalid(self): + """Test invalid ZVEI""" + dec = decoder.getDecoder("ZVEI") + assert dec.decode("ZVEI1: 1245A") is None + assert dec.decode("ZVEI1: 1245") is None + assert dec.decode("ZVEI1: 135") is None + assert dec.decode("ZVEI1: 54") is None + assert dec.decode("ZVEI1: 54") is None + + def test_decoder_pocsag_valid(self): + """!Test valid POCSAG""" + dec = decoder.getDecoder("POCSAG") + assert not dec.decode("POCSAG512: Address: 1000000 Function: 0") is None + assert not dec.decode("POCSAG512: Address: 1000001 Function: 1") is None + assert not dec.decode("POCSAG1200: Address: 1000002 Function: 2") is None + assert not dec.decode("POCSAG2400: Address: 1000003 Function: 3") is None + + def test_decoder_pocsag_text(self): + """!Test POCSAG with text""" + dec = decoder.getDecoder("POCSAG") + assert not dec.decode("POCSAG512: Address: 1000000 Function: 0 Alpha: test") is None + assert not dec.decode("POCSAG512: Address: 1000001 Function: 1 Alpha: test") is None + assert not dec.decode("POCSAG1200: Address: 1000002 Function: 2 Alpha: test") is None + assert not dec.decode("POCSAG2400: Address: 1000003 Function: 3 Alpha: test") is None + + def test_decoder_pocsag_short(self): + """!Test short POCSAG""" + dec = decoder.getDecoder("POCSAG") + assert not dec.decode("POCSAG512: Address: 3 Function: 0 Alpha: test") is None + assert not dec.decode("POCSAG512: Address: 33 Function: 0 Alpha: test") is None + assert not dec.decode("POCSAG1200: Address: 333 Function: 0 Alpha: test") is None + assert not dec.decode("POCSAG1200: Address: 3333 Function: 0 Alpha: test") is None + assert not dec.decode("POCSAG2400: Address: 33333 Function: 0 Alpha: test") is None + assert not dec.decode("POCSAG2400: Address: 333333 Function: 0 Alpha: test") is None + assert not dec.decode("POCSAG2400: Address: 3333333 Function: 0 Alpha: test") is None + + def test_decoder_pocsag_invalid(self): + """!Test invalid POCSAG""" + dec = decoder.getDecoder("POCSAG") + assert dec.decode("POCSAG512: Address: 333333F Function: 0 Alpha: invalid") is None + assert dec.decode("POCSAG512: Address: 333333F Function: 1 Alpha: invalid") is None + assert dec.decode("POCSAG512: Address: 3333333 Function: 4 Alpha: invalid") is None + + def test_decoder_fms_valid(self): + """!Test valid FMS""" + dec = decoder.getDecoder("FMS") + assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 0=FZG->LST 2=I (ohneNA,ohneSIGNAL)) CRC correct""") is None + assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 1=LST->FZG 2=I (ohneNA,ohneSIGNAL)) CRC correct""") is None + assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 0=FZG->LST 2=II (ohneNA,mit SIGNAL)) CRC correct""") is None + assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 1=LST->FZG 2=III(mit NA,ohneSIGNAL)) CRC correct""") is None + assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 0=FZG->LST 2=IV (mit NA,mit SIGNAL)) CRC correct""") is None + + def test_decoder_fms_invalid(self): + """!Test invalid FMS""" + dec = decoder.getDecoder("FMS") + assert dec.decode("""FMS: 14170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 1=LST->FZG 2=III(mit NA,ohneSIGNAL)) CRC correct""") is None + assert dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Sta 3=Einsatz Ab 0=FZG->LST 2=IV (mit NA,mit SIGNAL)) CRC correct""") is None + assert dec.decode("""FMS: 14170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 1=LST->FZG 2=III(mit NA,ohneSIGNAL)) CRC incorrect""") is None + assert dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Sta 3=Einsatz Ab 0=FZG->LST 2=IV (mit NA,mit SIGNAL)) CRC incorrect""") is None