add decoders and tests

This commit is contained in:
Bastian Schroll 2018-01-07 14:09:40 +01:00
parent 766ba810ad
commit ca6f05ffed
5 changed files with 377 additions and 0 deletions

View file

@ -0,0 +1,52 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: decoder.py
@date: 06.01.2018
@author: Bastian Schroll
@description: Some utils for the decoding
"""
import logging
from boswatch.decoder.fms import Fms
from boswatch.decoder.pocsag import Pocsag
from boswatch.decoder.zvei import Zvei
logging.debug("- %s loaded", __name__)
def getDecoder(data):
"""!Choose the right decoder and return the new decoder object
@param data: data to decode
@return Decoder object"""
logging.debug("search decoder")
if "FMS" in data:
return Fms()
elif "POCSAG" in data:
return Pocsag()
elif "ZVEI" in data:
return Zvei()
else:
logging.debug("no decoder found for: %s", data)
return DummyDecoder()
class DummyDecoder:
"""!This dummy decoder class is needed because in case of
an getDecoder() with false data, we must return a decoder
object with an decode() method to prevent an error"""
def __init__(self):
pass
@staticmethod
def decode(data):
return None

74
boswatch/decoder/fms.py Normal file
View file

@ -0,0 +1,74 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: fms.py
@date: 06.01.2018
@author: Bastian Schroll
@description: Decoder class for fms
"""
import logging
import re
from boswatch.packet import packet
logging.debug("- %s loaded", __name__)
class Fms:
"""!FMS decoder class
This class decodes FMS data.
First step is to validate the data and check if the format is correct.
In the last step a valid BOSWatch packet is created and returned"""
def __init__(self):
"""!Create a new instance"""
logging.debug("FMS decoder started")
def decode(self, data):
"""!Decodes FMS
@param data: FMS for decoding
@return BOSWatch FMS packet or None"""
if "CRC correct" in data:
service = data[19]
country = data[36]
location = data[61:63]
vehicle = data[72:76]
status = data[84]
direction = data[101]
directionText = data[103:110]
tacticalInfo = data[114:117]
fms_id = service + country + location + vehicle + status + direction
if re.search("[0-9a-f]{8}[0-9a-f]{1}[01]{1}", fms_id):
logging.debug("found valid FMS")
bwPacket = packet.Packet()
bwPacket.setField("mode", "fms")
bwPacket.setField("fms", fms_id)
bwPacket.setField("service", service)
bwPacket.setField("country", country)
bwPacket.setField("location", location)
bwPacket.setField("vehicle", vehicle)
bwPacket.setField("status", status)
bwPacket.setField("direction", direction)
bwPacket.setField("cirectionText", directionText)
bwPacket.setField("tacticalInfo", tacticalInfo)
logging.debug(bwPacket)
return bwPacket
logging.warning("no valid data")
return None
logging.warning("CRC Error")
return None

View file

@ -0,0 +1,83 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: pocsag.py
@date: 06.01.2018
@author: Bastian Schroll
@description: Decoder class for pocsag
"""
import logging
import re
from boswatch.packet import packet
logging.debug("- %s loaded", __name__)
class Pocsag:
"""!POCSAG decoder class
This class decodes POCSAG data.
First step is to validate the data and check if the format is correct.
In the last step a valid BOSWatch packet is created and returned"""
def __init__(self):
"""!Create a new instance"""
logging.debug("POCSAG decoder started")
def decode(self, data):
"""!Decodes POCSAG
@param data: POCSAG for decoding
@return BOSWatch POCSAG packet or None"""
bitrate = None
ric = None
subric = None
if "POCSAG512:" in data:
bitrate = 512
ric = data[20:27].replace(" ", "").zfill(7)
subric = str(int(data[39]) + 1)
elif "POCSAG1200:" in data:
bitrate = 1200
ric = data[21:28].replace(" ", "").zfill(7)
subric = str(int(data[40]) + 1)
elif "POCSAG2400:" in data:
bitrate = 2400
ric = data[21:28].replace(" ", "").zfill(7)
subric = str(int(data[40]) + 1)
if re.search("[0-9]{7}", ric) and re.search("[1-4]{1}", subric):
if "Alpha:" in data:
message = data.split('Alpha: ')[1].strip()
message = message.replace('<NUL><NUL>', '').replace('<NUL>', '').replace('<NUL', '').replace('< NUL>', '').replace('<EOT>', '').strip()
else:
message = ""
subricText = subric.replace("1", "a").replace("2", "b").replace("3", "c").replace("4", "d")
logging.debug("found valid POCSAG")
bwPacket = packet.Packet()
bwPacket.setField("mode", "pocsag")
bwPacket.setField("bitrate", bitrate)
bwPacket.setField("ric", ric)
bwPacket.setField("subric", subric)
bwPacket.setField("subricText", subricText)
bwPacket.setField("message", message)
logging.debug(bwPacket)
return bwPacket
logging.warning("no valid data")
return None

68
boswatch/decoder/zvei.py Normal file
View file

@ -0,0 +1,68 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: zvei.py
@date: 05.01.2018
@author: Bastian Schroll
@description: Decoder class for zvei
"""
import logging
import re
from boswatch.packet import packet
logging.debug("- %s loaded", __name__)
class Zvei:
"""!ZVEI decoder class
This class decodes ZVEI data.
First step is to validate the data and check if the format is correct.
After that the double-tone-sign 'E' is replaced.
In the last step a valid BOSWatch packet is created and returned"""
def __init__(self):
"""!Create a new instance"""
logging.debug("ZVEI decoder started")
def decode(self, data):
"""!Decodes ZVEI
@param data: ZVEI for decoding
@return BOSWatch ZVEI packet or None"""
if re.search("[0-9E]{5}", data[7:12]):
logging.debug("found valid ZVEI")
bwPacket = packet.Packet()
bwPacket.setField("mode", "zvei")
bwPacket.setField("zvei", self._solveDoubleTone(data[7:12]))
logging.debug(bwPacket)
return bwPacket
logging.warning("no valid data")
return None
@staticmethod
def _solveDoubleTone(data):
"""!Remove the doubleTone sign (here its the 'E')
@param data: ZVEI for double tone sign replacement
@return Double Tone replaced ZVEI"""
if "E" in data:
data_old = data
for i in range(1, len(data)):
if data[i] == "E":
data = data.replace("E", data[i - 1], 1)
logging.debug("solve doubleTone: %s -> %s", data_old, data)
return data

100
test/test_decoder.py Normal file
View file

@ -0,0 +1,100 @@
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""!
____ ____ ______ __ __ __ _____
/ __ )/ __ \/ ___/ | / /___ _/ /______/ /_ |__ /
/ __ / / / /\__ \| | /| / / __ `/ __/ ___/ __ \ /_ <
/ /_/ / /_/ /___/ /| |/ |/ / /_/ / /_/ /__/ / / / ___/ /
/_____/\____//____/ |__/|__/\__,_/\__/\___/_/ /_/ /____/
German BOS Information Script
by Bastian Schroll
@file: test_decoder.py
@date: 15.12.2017
@author: Bastian Schroll
@description: Unittests for BOSWatch. File must be run as "pytest" unittest
"""
from boswatch.decoder import decoder
class Test_decoder:
"""!Unittests for the decoder"""
def test_decoder_no_data(self):
"""!Test a empty string"""
assert decoder.getDecoder("").decode("") is None
def test_decoder_zvei_valid(self):
"""!Test valid ZVEI"""
dec = decoder.getDecoder("ZVEI")
assert not dec.decode("ZVEI1: 12345") is None
assert not dec.decode("ZVEI1: 12838") is None
assert not dec.decode("ZVEI1: 34675") is None
def test_decoder_zvei_doubleTone(self):
"""!Test doubleTone included ZVEI"""
dec = decoder.getDecoder("ZVEI")
assert not dec.decode("ZVEI1: 6E789") is None
assert not dec.decode("ZVEI1: 975E7") is None
assert not dec.decode("ZVEI1: 2E87E") is None
def test_decoder_zvei_invalid(self):
"""Test invalid ZVEI"""
dec = decoder.getDecoder("ZVEI")
assert dec.decode("ZVEI1: 1245A") is None
assert dec.decode("ZVEI1: 1245") is None
assert dec.decode("ZVEI1: 135") is None
assert dec.decode("ZVEI1: 54") is None
assert dec.decode("ZVEI1: 54") is None
def test_decoder_pocsag_valid(self):
"""!Test valid POCSAG"""
dec = decoder.getDecoder("POCSAG")
assert not dec.decode("POCSAG512: Address: 1000000 Function: 0") is None
assert not dec.decode("POCSAG512: Address: 1000001 Function: 1") is None
assert not dec.decode("POCSAG1200: Address: 1000002 Function: 2") is None
assert not dec.decode("POCSAG2400: Address: 1000003 Function: 3") is None
def test_decoder_pocsag_text(self):
"""!Test POCSAG with text"""
dec = decoder.getDecoder("POCSAG")
assert not dec.decode("POCSAG512: Address: 1000000 Function: 0 Alpha: test") is None
assert not dec.decode("POCSAG512: Address: 1000001 Function: 1 Alpha: test") is None
assert not dec.decode("POCSAG1200: Address: 1000002 Function: 2 Alpha: test") is None
assert not dec.decode("POCSAG2400: Address: 1000003 Function: 3 Alpha: test") is None
def test_decoder_pocsag_short(self):
"""!Test short POCSAG"""
dec = decoder.getDecoder("POCSAG")
assert not dec.decode("POCSAG512: Address: 3 Function: 0 Alpha: test") is None
assert not dec.decode("POCSAG512: Address: 33 Function: 0 Alpha: test") is None
assert not dec.decode("POCSAG1200: Address: 333 Function: 0 Alpha: test") is None
assert not dec.decode("POCSAG1200: Address: 3333 Function: 0 Alpha: test") is None
assert not dec.decode("POCSAG2400: Address: 33333 Function: 0 Alpha: test") is None
assert not dec.decode("POCSAG2400: Address: 333333 Function: 0 Alpha: test") is None
assert not dec.decode("POCSAG2400: Address: 3333333 Function: 0 Alpha: test") is None
def test_decoder_pocsag_invalid(self):
"""!Test invalid POCSAG"""
dec = decoder.getDecoder("POCSAG")
assert dec.decode("POCSAG512: Address: 333333F Function: 0 Alpha: invalid") is None
assert dec.decode("POCSAG512: Address: 333333F Function: 1 Alpha: invalid") is None
assert dec.decode("POCSAG512: Address: 3333333 Function: 4 Alpha: invalid") is None
def test_decoder_fms_valid(self):
"""!Test valid FMS"""
dec = decoder.getDecoder("FMS")
assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 0=FZG->LST 2=I (ohneNA,ohneSIGNAL)) CRC correct""") is None
assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 1=LST->FZG 2=I (ohneNA,ohneSIGNAL)) CRC correct""") is None
assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 0=FZG->LST 2=II (ohneNA,mit SIGNAL)) CRC correct""") is None
assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 1=LST->FZG 2=III(mit NA,ohneSIGNAL)) CRC correct""") is None
assert not dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 0=FZG->LST 2=IV (mit NA,mit SIGNAL)) CRC correct""") is None
def test_decoder_fms_invalid(self):
"""!Test invalid FMS"""
dec = decoder.getDecoder("FMS")
assert dec.decode("""FMS: 14170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 1=LST->FZG 2=III(mit NA,ohneSIGNAL)) CRC correct""") is None
assert dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Sta 3=Einsatz Ab 0=FZG->LST 2=IV (mit NA,mit SIGNAL)) CRC correct""") is None
assert dec.decode("""FMS: 14170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Status 3=Einsatz Ab 1=LST->FZG 2=III(mit NA,ohneSIGNAL)) CRC incorrect""") is None
assert dec.decode("""FMS: 43f314170000 (9=Rotkreuz 3=Bayern 1 Ort 0x25=037FZG 7141Sta 3=Einsatz Ab 0=FZG->LST 2=IV (mit NA,mit SIGNAL)) CRC incorrect""") is None