mirror of
https://github.com/dh1tw/pyhamtools.git
synced 2026-01-20 07:20:26 +01:00
Added Push to Redis and Redis as a LookupLib
This commit is contained in:
parent
d6d90d3e2a
commit
8216f488ef
|
|
@ -91,6 +91,27 @@ class Callinfo(object):
|
|||
continue
|
||||
raise KeyError
|
||||
|
||||
def check_if_mm(self, callsign):
|
||||
if re.search("/MM$", callsign.upper()):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def check_if_am(self, callsign):
|
||||
if re.search("/AM$", callsign.upper()):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def check_if_beacon(self, callsign):
|
||||
if re.search("/B$", callsign.upper()):
|
||||
return True
|
||||
elif re.search("/BCN$", callsign.upper()):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def _dismantle_callsign(self, callsign, timestamp=timestamp_now):
|
||||
""" try to identify the callsign's identity by analyzing it in the following order:
|
||||
|
||||
|
|
@ -121,10 +142,23 @@ class Callinfo(object):
|
|||
|
||||
if appendix == 'MM': # special case Martime Mobile
|
||||
#self._mm = True
|
||||
raise KeyError
|
||||
return {
|
||||
'adif': 999,
|
||||
'continent': '',
|
||||
'country': 'MARITIME MOBILE',
|
||||
'cqz': 0,
|
||||
'latitude': 0.0,
|
||||
'longitude': 0.0
|
||||
}
|
||||
elif appendix == 'AM': # special case Aeronautic Mobile
|
||||
#self._am = True
|
||||
raise KeyError
|
||||
return {
|
||||
'adif': 998,
|
||||
'continent': '',
|
||||
'country': 'AIRCAFT MOBILE',
|
||||
'cqz': 0,
|
||||
'latitude': 0.0,
|
||||
'longitude': 0.0
|
||||
}
|
||||
elif appendix == 'QRP': # special case QRP
|
||||
callsign = re.sub('/QRP', '', callsign)
|
||||
return self._iterate_prefix(callsign, timestamp)
|
||||
|
|
@ -133,8 +167,9 @@ class Callinfo(object):
|
|||
return self._iterate_prefix(callsign, timestamp)
|
||||
elif appendix == 'BCN': #filter all beacons
|
||||
callsign = re.sub('/BCN', '', callsign)
|
||||
# self.beacon = True
|
||||
return self._iterate_prefix(callsign, timestamp)
|
||||
data = self._iterate_prefix(callsign, timestamp).copy()
|
||||
data[const.BEACON] = True
|
||||
return data
|
||||
elif appendix == "LH": #Filter all Lighthouses
|
||||
callsign = re.sub('/LH', '', callsign)
|
||||
return self._iterate_prefix(callsign, timestamp)
|
||||
|
|
@ -149,8 +184,9 @@ class Callinfo(object):
|
|||
|
||||
if appendix == 'B': #special case Beacon
|
||||
callsign = re.sub('/B', '', callsign)
|
||||
return self._iterate_prefix(callsign, timestamp)
|
||||
# self.beacon = True
|
||||
data = self._iterate_prefix(callsign, timestamp).copy()
|
||||
data[const.BEACON] = True
|
||||
return data
|
||||
|
||||
elif re.search('\d$', appendix):
|
||||
area_nr = re.search('\d$', appendix).group(0)
|
||||
|
|
@ -186,9 +222,31 @@ class Callinfo(object):
|
|||
if invalid:
|
||||
raise
|
||||
|
||||
if self.check_if_mm(callsign):
|
||||
return {
|
||||
'adif': 999,
|
||||
'continent': '',
|
||||
'country': 'MARITIME MOBILE',
|
||||
'cqz': 0,
|
||||
'latitude': 0.0,
|
||||
'longitude': 0.0
|
||||
}
|
||||
elif self.check_if_am(callsign):
|
||||
return {
|
||||
'adif': 998,
|
||||
'continent': '',
|
||||
'country': 'AIRCAFT MOBILE',
|
||||
'cqz': 0,
|
||||
'latitude': 0.0,
|
||||
'longitude': 0.0
|
||||
}
|
||||
|
||||
# Check if a dedicated entry exists for the callsign
|
||||
try:
|
||||
return self._lookuplib.lookup_callsign(callsign, timestamp)
|
||||
data = self._lookuplib.lookup_callsign(callsign, timestamp).copy()
|
||||
if self.check_if_beacon(callsign):
|
||||
data[const.BEACON] = True
|
||||
return data
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
|
|
|
|||
|
|
@ -3,6 +3,7 @@
|
|||
class LookupConventions:
|
||||
""" This class defines the constants used within the pyhamtools package """
|
||||
|
||||
# Mostly specific to Clublog XML File
|
||||
CALLSIGN = "callsign"
|
||||
COUNTRY = "country"
|
||||
PREFIX = "prefix"
|
||||
|
|
@ -18,23 +19,45 @@ class LookupConventions:
|
|||
WHITELIST_START = "whitelist_start"
|
||||
WHITELIST_END = "whitelist_end"
|
||||
DELETED = "deleted"
|
||||
MARITIME_MOBILE = "mm"
|
||||
AIRCRAFT_MOBILE = "am"
|
||||
BEACON = "beacon"
|
||||
SKIMMER = "skimmer"
|
||||
|
||||
class Modes:
|
||||
""" Constants for Operating modes """
|
||||
|
||||
# Modes
|
||||
CW = "CW"
|
||||
USB = "USB"
|
||||
LSB = "LSB"
|
||||
DIGITAL = "DIGITAL"
|
||||
FM = "FM"
|
||||
|
||||
class DXSpot:
|
||||
""" Constants used for DX Spots """
|
||||
|
||||
#DX Spot
|
||||
SPOTTER = "spotter"
|
||||
DX = "dx"
|
||||
FREQUENCY = "frequency"
|
||||
COMMENT = "comment"
|
||||
TIME = "time"
|
||||
BAND = "band"
|
||||
MODE = "mode"
|
||||
MODE = "mode"
|
||||
|
||||
#DX Spider specific
|
||||
ORIGIN_NODE = "node"
|
||||
HOPS = "hops"
|
||||
RAW_SPOT = "raw"
|
||||
IP = "ip"
|
||||
ROUTE = "route"
|
||||
TEXT = "text"
|
||||
SYSOP_FLAG = "sysop_flag"
|
||||
WX_FLAG = "wx_flag"
|
||||
|
||||
#WWV & WCY
|
||||
STATION = "station"
|
||||
R = "r"
|
||||
K = "k"
|
||||
EXPK = "expk"
|
||||
SFI = "sfi"
|
||||
A = "a"
|
||||
AURORA = "aurora"
|
||||
SA = "sa"
|
||||
GMF = "gmf"
|
||||
FORECAST = "forecast"
|
||||
|
|
|
|||
|
|
@ -1,60 +1,84 @@
|
|||
__author__ = 'dh1tw'
|
||||
|
||||
from datetime import datetime
|
||||
from time import strptime, mktime
|
||||
import re
|
||||
|
||||
|
||||
import pytz
|
||||
|
||||
from pyhamtools.consts import LookupConventions as const
|
||||
|
||||
|
||||
UTC = pytz.UTC
|
||||
|
||||
from pyhamtools.utils import freq_to_band
|
||||
from pyhamtools.consts import Modes as mode
|
||||
from pyhamtools.consts import DXSpot as dxspot
|
||||
|
||||
|
||||
def decode_spot(raw_string):
|
||||
def decode_char_spot(raw_string):
|
||||
"""Chop Line from DX-Cluster into pieces and return a dict with the spot data"""
|
||||
|
||||
spotter_call = None
|
||||
dx_call = None
|
||||
frequency = None
|
||||
comment = None
|
||||
spot_time = None
|
||||
band = None
|
||||
mode = None
|
||||
bandmode = None
|
||||
data = {}
|
||||
|
||||
# Spotter callsign
|
||||
if re.match('[A-Za-z0-9\/]+[:$]', raw_string[6:15]):
|
||||
spotter_call = re.sub(':', '', re.match('[A-Za-z0-9\/]+[:$]', raw_string[6:15]).group(0))
|
||||
data[const.SPOTTER] = re.sub(':', '', re.match('[A-Za-z0-9\/]+[:$]', raw_string[6:15]).group(0))
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
if re.search('[0-9\.]{5,12}', raw_string[10:25]):
|
||||
frequency = float(re.search('[0-9\.]{5,12}', raw_string[10:25]).group(0))
|
||||
data[const.FREQUENCY] = float(re.search('[0-9\.]{5,12}', raw_string[10:25]).group(0))
|
||||
else:
|
||||
raise ValueError
|
||||
|
||||
dx_call = re.sub('[^A-Za-z0-9\/]+', '', raw_string[26:38])
|
||||
comment = re.sub('[^\sA-Za-z0-9\.,;\#\+\-!\?\$\(\)@\/]+', ' ', raw_string[39:69])
|
||||
spot_time_ = re.sub('[^0-9]+', '', raw_string[70:74])
|
||||
spot_time = datetime(hour=int(spot_time_[0:2]), minute=int(spot_time_[2:4]), second=0, microsecond = 0, tzinfo=UTC)
|
||||
|
||||
try:
|
||||
bandmode = freq_to_band(frequency)
|
||||
band = bandmode["band"]
|
||||
mode = bandmode["mode"]
|
||||
except KeyError:
|
||||
raise ValueError
|
||||
|
||||
data = {
|
||||
dxspot.SPOTTER: spotter_call,
|
||||
dxspot.DX: dx_call,
|
||||
dxspot.BAND: band,
|
||||
dxspot.MODE: mode,
|
||||
dxspot.COMMENT: comment,
|
||||
dxspot.TIME: spot_time
|
||||
}
|
||||
data[const.DX] = re.sub('[^A-Za-z0-9\/]+', '', raw_string[26:38])
|
||||
data[const.COMMENT] = re.sub('[^\sA-Za-z0-9\.,;\#\+\-!\?\$\(\)@\/]+', ' ', raw_string[39:69]).strip()
|
||||
data[const.TIME] = datetime.now().replace(tzinfo=UTC)
|
||||
|
||||
return data
|
||||
|
||||
def decode_pc11_message(raw_string):
|
||||
"""Decode PC11 message, which usually contains DX Spots"""
|
||||
|
||||
data = {}
|
||||
spot = raw_string.split("^")
|
||||
data[const.FREQUENCY] = float(spot[1])
|
||||
data[const.DX] = spot[2]
|
||||
data[const.TIME] = datetime.fromtimestamp(mktime(strptime(spot[3]+" "+spot[4][:-1], "%d-%b-%Y %H%M")))
|
||||
data[const.COMMENT] = spot[5]
|
||||
data[const.SPOTTER] = spot[6]
|
||||
data["node"] = spot[7]
|
||||
data["raw_spot"] = raw_string
|
||||
return data
|
||||
|
||||
|
||||
def decode_pc61_message(raw_string):
|
||||
"""Decode PC61 message, which usually contains DX Spots"""
|
||||
|
||||
data = {}
|
||||
spot = raw_string.split("^")
|
||||
data[const.FREQUENCY] = float(spot[1])
|
||||
data[const.DX] = spot[2]
|
||||
data[const.TIME] = datetime.fromtimestamp(mktime(strptime(spot[3]+" "+spot[4][:-1], "%d-%b-%Y %H%M")))
|
||||
data[const.COMMENT] = spot[5]
|
||||
data[const.SPOTTER] = spot[6]
|
||||
data["node"] = spot[7]
|
||||
data["ip"] = spot[8]
|
||||
data["raw_spot"] = raw_string
|
||||
return data
|
||||
|
||||
def decode_pc23_message(raw_string):
|
||||
""" Decode PC23 Message which usually contains WCY """
|
||||
|
||||
data = {}
|
||||
wcy = raw_string.split("^")
|
||||
data[const.R] = int(wcy[1])
|
||||
data[const.expk] = int(wcy[2])
|
||||
data[const.CALLSIGN] = wcy[3]
|
||||
data[const.A] = wcy[4]
|
||||
data[const.SFI] = wcy[5]
|
||||
data[const.K] = wcy[6]
|
||||
data[const.AURORA] = wcy[7]
|
||||
data["node"] = wcy[7]
|
||||
data["ip"] = wcy[8]
|
||||
data["raw_data"] = raw_string
|
||||
return data
|
||||
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ from datetime import datetime
|
|||
import xml.etree.ElementTree as ET
|
||||
import urllib
|
||||
import json
|
||||
import pickle
|
||||
import copy
|
||||
import sys
|
||||
|
||||
|
|
@ -37,7 +38,8 @@ class LookupLib(object):
|
|||
|
||||
It's aim is to provide a homogeneous interface to different databases.
|
||||
|
||||
Typically an instance of this class is injected as a dependency in the :py:class:`Callinfo` class, but it can also be used directly.
|
||||
Typically an instance of this class is injected as a dependency in the :py:class:`Callinfo` class, but it can also
|
||||
be used directly.
|
||||
|
||||
Even the interface is the same for all lookup sources, the returning data can be different.
|
||||
The documentation of the various methods provide more detail.
|
||||
|
|
@ -45,14 +47,23 @@ class LookupLib(object):
|
|||
By default, LookupLib requires an Internet connection to download the libraries or perform the
|
||||
lookup against the Clublog API.
|
||||
|
||||
The entire lookup data can also be copied into Redis, which an extremely fast in-memory Key/Value store.
|
||||
A LookupLib object can be instanciated to perform then all lookups in Redis, instead processing and loading
|
||||
the data from Internet / File. This saves some time and allows several instances of :py:call:`LookupLib`
|
||||
to query the same data concurrently.
|
||||
|
||||
Args:
|
||||
lookuptype (str) : "clublogxml" or "clublogapi" or "countryfile"
|
||||
lookuptype (str) : "clublogxml" or "clublogapi" or "countryfile" or "redis"
|
||||
apikey (str): Clublog API Key
|
||||
filename (str, optional): Filename for Clublog XML or Country-files.com cty.plist file. When a local file is used, no Internet connection not API Key is necessary.
|
||||
filename (str, optional): Filename for Clublog XML or Country-files.com cty.plist file. When a local file is
|
||||
used, no Internet connection not API Key is necessary.
|
||||
logger (logging.getLogger(__name__), optional): Python logger
|
||||
redis_instance (redis.Redis(), optional): Instance of Redis
|
||||
redis_prefix (str, optional): Prefix to identify the lookup data set in Redis
|
||||
|
||||
|
||||
"""
|
||||
def __init__(self, lookuptype = "countryfile", apikey=None, filename=None, logger=None):
|
||||
def __init__(self, lookuptype = "countryfile", apikey=None, filename=None, logger=None, redis_instance=None, redis_prefix=None):
|
||||
|
||||
self._logger = None
|
||||
if logger:
|
||||
|
|
@ -67,6 +78,8 @@ class LookupLib(object):
|
|||
self._apikey = apikey
|
||||
self._download = True
|
||||
self._lib_filename = filename
|
||||
self._redis = redis_instance
|
||||
self._redis_prefix = redis_prefix
|
||||
|
||||
if self._lib_filename:
|
||||
self._download = False
|
||||
|
|
@ -87,9 +100,102 @@ class LookupLib(object):
|
|||
self._load_countryfile(cty_file=self._lib_filename)
|
||||
elif self._lookuptype == "clublogapi":
|
||||
pass
|
||||
elif self._lookuptype == "redis":
|
||||
import redis
|
||||
else:
|
||||
raise AttributeError("Lookup type missing")
|
||||
|
||||
def copy_data_in_redis(self, redis_prefix, redis_instance):
|
||||
"""
|
||||
Copy the complete lookup data and indexes of the object into redis. Old data will be overwritten.
|
||||
|
||||
Args:
|
||||
redis_prefix (str): Prefix to distinguish the data in redis for the different looktypes
|
||||
redis_instance (str): an Instance of Redis
|
||||
|
||||
Returns:
|
||||
bool: returns True when the data has been copied successfully into Redis
|
||||
|
||||
Example:
|
||||
Copy the entire lookup data from the Country-files.com PLIST File into Redis. A Redis Instance
|
||||
needs to be installed, as well as copy of the python Redis connector (pip install redis-py)
|
||||
|
||||
>>> from pyhamtools import LookupLib
|
||||
>>> import redis
|
||||
>>> r = redis.Redis()
|
||||
>>> my_lookuplib = LookupLib(lookuptype="countryfile")
|
||||
>>> print my_lookuplib.copy_data_in_redis(redis_prefix="CF", redis_instance=r)
|
||||
True
|
||||
|
||||
Now let's create an instance of LookupLib, using Redis to query the data
|
||||
>>> from pyhamtools import LookupLib
|
||||
>>> import redis
|
||||
>>> r = redis.Redis()
|
||||
>>> my_lookuplib = LookupLib(lookuptype="countryfile", redis_instance=r, redis_prefix="CF")
|
||||
>>> my_lookuplib.lookup_callsign("3D2RI")
|
||||
{
|
||||
u'adif': 460,
|
||||
u'continent': 'OC',
|
||||
u'country': 'Rotuma Island',
|
||||
u'cqz': 32,
|
||||
u'ituz': 56,
|
||||
u'latitude': -12.48,
|
||||
u'longitude': -177.08
|
||||
}
|
||||
|
||||
|
||||
Note:
|
||||
This method is available for the following lookup type
|
||||
|
||||
- clublogxml
|
||||
- countryfile
|
||||
"""
|
||||
|
||||
if redis_instance is not None:
|
||||
self._redis = redis_instance
|
||||
|
||||
if self._redis is None:
|
||||
raise AttributeError("redis_instance is missing")
|
||||
|
||||
if redis_prefix is None:
|
||||
raise KeyError("redis_prefix is missing")
|
||||
|
||||
if self._lookuptype == "clublogxml" or self._lookuptype == "countryfile":
|
||||
|
||||
self._push_dict_to_redis(self._entities, redis_prefix, "_entity_")
|
||||
|
||||
self._push_dict_index_to_redis(self._callsign_exceptions_index, redis_prefix, "_call_ex_index_")
|
||||
self._push_dict_to_redis(self._callsign_exceptions, redis_prefix, "_call_ex_")
|
||||
|
||||
self._push_dict_index_to_redis(self._prefixes_index, redis_prefix, "_prefix_index_")
|
||||
self._push_dict_to_redis(self._prefixes, redis_prefix, "_prefix_")
|
||||
|
||||
self._push_dict_index_to_redis(self._invalid_operations_index, redis_prefix, "_inv_op_index_")
|
||||
self._push_dict_to_redis(self._invalid_operations, redis_prefix, "_inv_op_")
|
||||
|
||||
self._push_dict_index_to_redis(self._zone_exceptions_index, redis_prefix, "_zone_ex_index_")
|
||||
self._push_dict_to_redis(self._zone_exceptions, redis_prefix, "_zone_ex_")
|
||||
|
||||
return True
|
||||
|
||||
def _push_dict_to_redis(self, push_dict, redis_prefix, name):
|
||||
r = self._redis
|
||||
for i in push_dict:
|
||||
json_data = self._serialize_data(push_dict[i])
|
||||
r.delete(redis_prefix + name + str(i))
|
||||
r.set(redis_prefix + name + str(i), json_data)
|
||||
return True
|
||||
|
||||
def _push_dict_index_to_redis(self, index_dict, redis_prefix, name):
|
||||
r = self._redis
|
||||
for i in index_dict:
|
||||
r.delete(redis_prefix + name + str(i))
|
||||
for el in index_dict[i]:
|
||||
r.sadd(redis_prefix + name + str(i), el)
|
||||
return True
|
||||
|
||||
|
||||
|
||||
def lookup_entity(self, entity=None):
|
||||
"""Returns lookup data of an ADIF Entity
|
||||
|
||||
|
|
@ -124,29 +230,47 @@ class LookupLib(object):
|
|||
This method is available for the following lookup type
|
||||
|
||||
- clublogxml
|
||||
- redis
|
||||
|
||||
"""
|
||||
|
||||
try:
|
||||
if self._lookuptype == "clublogxml":
|
||||
entity = int(entity)
|
||||
if entity in self._entities:
|
||||
entity_data = copy.deepcopy(self._entities[entity])
|
||||
if const.START in entity_data:
|
||||
del entity_data[const.START]
|
||||
if const.END in entity_data:
|
||||
del entity_data[const.END]
|
||||
if const.WHITELIST in entity_data:
|
||||
del entity_data[const.WHITELIST]
|
||||
if const.WHITELIST_START in entity_data:
|
||||
del entity_data[const.WHITELIST_START]
|
||||
if const.WHITELIST_END in entity_data:
|
||||
del entity_data[const.WHITELIST_END]
|
||||
|
||||
return entity_data
|
||||
return self._strip_metadata(self._entities[entity])
|
||||
else:
|
||||
raise KeyError
|
||||
except:
|
||||
raise KeyError
|
||||
|
||||
elif self._lookuptype == "redis":
|
||||
|
||||
if self._redis_prefix is None:
|
||||
raise KeyError ("redis_prefix is missing")
|
||||
|
||||
#entity = str(entity)
|
||||
json_data = self._redis.get(self._redis_prefix + "_entity_" + str(entity))
|
||||
if json_data is not None:
|
||||
my_dict = self._deserialize_data(json_data)
|
||||
return self._strip_metadata(my_dict)
|
||||
|
||||
# no matching case
|
||||
raise KeyError
|
||||
|
||||
def _strip_metadata(self, my_dict):
|
||||
"""
|
||||
Create a copy of dict and remove not needed data
|
||||
"""
|
||||
new_dict = copy.deepcopy(my_dict)
|
||||
if const.START in new_dict:
|
||||
del new_dict[const.START]
|
||||
if const.END in new_dict:
|
||||
del new_dict[const.END]
|
||||
if const.WHITELIST in new_dict:
|
||||
del new_dict[const.WHITELIST]
|
||||
if const.WHITELIST_START in new_dict:
|
||||
del new_dict[const.WHITELIST_START]
|
||||
if const.WHITELIST_END in new_dict:
|
||||
del new_dict[const.WHITELIST_END]
|
||||
return new_dict
|
||||
|
||||
|
||||
def lookup_callsign(self, callsign=None, timestamp=timestamp_now):
|
||||
"""
|
||||
|
|
@ -187,6 +311,7 @@ class LookupLib(object):
|
|||
- clublogxml
|
||||
- clublogapi
|
||||
- countryfile
|
||||
- redis
|
||||
|
||||
"""
|
||||
callsign = callsign.strip().upper()
|
||||
|
|
@ -198,41 +323,113 @@ class LookupLib(object):
|
|||
else:
|
||||
return callsign_data
|
||||
|
||||
if self._lookuptype == "clublogxml" or self._lookuptype == "countryfile":
|
||||
elif self._lookuptype == "clublogxml" or self._lookuptype == "countryfile":
|
||||
|
||||
if callsign in self._callsign_exceptions_index:
|
||||
for item in self._callsign_exceptions_index[callsign]:
|
||||
return self._check_data_for_date(callsign, timestamp, self._callsign_exceptions, self._callsign_exceptions_index)
|
||||
|
||||
# startdate < timestamp
|
||||
if const.START in self._callsign_exceptions[item] and not const.END in self._callsign_exceptions[item]:
|
||||
if self._callsign_exceptions[item][const.START] < timestamp:
|
||||
callsign_data = copy.deepcopy(self._callsign_exceptions[item])
|
||||
del callsign_data[const.START]
|
||||
return callsign_data
|
||||
elif self._lookuptype == "redis":
|
||||
|
||||
# enddate > timestamp
|
||||
elif not const.START in self._callsign_exceptions[item] and const.END in self._callsign_exceptions[item]:
|
||||
if self._callsign_exceptions[item][const.END] > timestamp:
|
||||
callsign_data = copy.deepcopy(self._callsign_exceptions[item])
|
||||
del callsign_data[const.END]
|
||||
return callsign_data
|
||||
|
||||
# startdate > timestamp > enddate
|
||||
elif const.START in self._callsign_exceptions[item] and const.END in self._callsign_exceptions[item]:
|
||||
if self._callsign_exceptions[item][const.START] < timestamp \
|
||||
and self._callsign_exceptions[item][const.END] > timestamp:
|
||||
callsign_data = copy.deepcopy(self._callsign_exceptions[item])
|
||||
del callsign_data[const.START]
|
||||
del callsign_data[const.END]
|
||||
return callsign_data
|
||||
|
||||
# no startdate or enddate available
|
||||
elif not const.START in self._callsign_exceptions[item] and not const.END in self._callsign_exceptions[item]:
|
||||
return self._callsign_exceptions[item]
|
||||
data_dict, index = self._get_dicts_from_redis("_call_ex_", "_call_ex_index_", self._redis_prefix, callsign)
|
||||
return self._check_data_for_date(callsign, timestamp, data_dict, index)
|
||||
|
||||
# no matching case
|
||||
raise KeyError
|
||||
|
||||
def _get_dicts_from_redis(self, name, index_name, redis_prefix, item):
|
||||
"""
|
||||
Retrieve the data of an item from redis and put it in an index and data dictionary to match the
|
||||
common query interface.
|
||||
"""
|
||||
r = self._redis
|
||||
data_dict = {}
|
||||
data_index_dict = {}
|
||||
|
||||
if redis_prefix is None:
|
||||
raise KeyError ("redis_prefix is missing")
|
||||
|
||||
if r.scard(redis_prefix + index_name + str(item)) > 0:
|
||||
data_index_dict[item] = r.smembers(redis_prefix + index_name + str(item))
|
||||
|
||||
for i in data_index_dict[item]:
|
||||
json_data = r.get(redis_prefix + name + i)
|
||||
data_dict[i] = self._deserialize_data(json_data)
|
||||
|
||||
return (data_dict, data_index_dict)
|
||||
|
||||
raise KeyError ("No Data found in Redis for "+ item)
|
||||
|
||||
def _check_data_for_date(self, item, timestamp, data_dict, data_index_dict):
|
||||
"""
|
||||
Checks if the item is found in the index. An entry in the index points to the data
|
||||
in the data_dict. This is mainly used retrieve callsigns and prefixes.
|
||||
In case data is found for item, a dict containing the data is returned. Otherwise a KeyError is raised.
|
||||
"""
|
||||
|
||||
if item in data_index_dict:
|
||||
for item in data_index_dict[item]:
|
||||
|
||||
# startdate < timestamp
|
||||
if const.START in data_dict[item] and not const.END in data_dict[item]:
|
||||
if data_dict[item][const.START] < timestamp:
|
||||
item_data = copy.deepcopy(data_dict[item])
|
||||
del item_data[const.START]
|
||||
return item_data
|
||||
|
||||
# enddate > timestamp
|
||||
elif not const.START in data_dict[item] and const.END in data_dict[item]:
|
||||
if data_dict[item][const.END] > timestamp:
|
||||
item_data = copy.deepcopy(data_dict[item])
|
||||
del item_data[const.END]
|
||||
return item_data
|
||||
|
||||
# startdate > timestamp > enddate
|
||||
elif const.START in data_dict[item] and const.END in data_dict[item]:
|
||||
if data_dict[item][const.START] < timestamp \
|
||||
and data_dict[item][const.END] > timestamp:
|
||||
item_data = copy.deepcopy(data_dict[item])
|
||||
del item_data[const.START]
|
||||
del item_data[const.END]
|
||||
return item_data
|
||||
|
||||
# no startdate or enddate available
|
||||
elif not const.START in data_dict[item] and not const.END in data_dict[item]:
|
||||
return data_dict[item]
|
||||
|
||||
raise KeyError
|
||||
|
||||
|
||||
def _check_inv_operation_for_date(self, item, timestamp, data_dict, data_index_dict):
|
||||
"""
|
||||
Checks if the callsign is marked as an invalid operation for a given timestamp.
|
||||
In case the operation is invalid, True is returned. Otherwise a KeyError is raised.
|
||||
"""
|
||||
|
||||
if item in data_index_dict:
|
||||
for item in data_index_dict[item]:
|
||||
|
||||
# startdate < timestamp
|
||||
if const.START in data_dict[item] and not const.END in data_dict[item]:
|
||||
if data_dict[item][const.START] < timestamp:
|
||||
return True
|
||||
|
||||
# enddate > timestamp
|
||||
elif not const.START in data_dict[item] and const.END in data_dict[item]:
|
||||
if data_dict[item][const.END] > timestamp:
|
||||
return True
|
||||
|
||||
# startdate > timestamp > enddate
|
||||
elif const.START in data_dict[item] and const.END in data_dict[item]:
|
||||
if data_dict[item][const.START] < timestamp \
|
||||
and data_dict[item][const.END] > timestamp:
|
||||
return True
|
||||
|
||||
# no startdate or enddate available
|
||||
elif not const.START in data_dict[item] and not const.END in data_dict[item]:
|
||||
return True
|
||||
|
||||
raise KeyError
|
||||
|
||||
|
||||
def lookup_prefix(self, prefix, timestamp=timestamp_now):
|
||||
"""
|
||||
Returns lookup data of a Prefix
|
||||
|
|
@ -270,6 +467,7 @@ class LookupLib(object):
|
|||
|
||||
- clublogxml
|
||||
- countryfile
|
||||
- redis
|
||||
|
||||
"""
|
||||
|
||||
|
|
@ -277,39 +475,14 @@ class LookupLib(object):
|
|||
|
||||
if self._lookuptype == "clublogxml" or self._lookuptype == "countryfile":
|
||||
|
||||
if prefix in self._prefixes_index:
|
||||
for item in self._prefixes_index[prefix]:
|
||||
return self._check_data_for_date(prefix, timestamp, self._prefixes, self._prefixes_index)
|
||||
|
||||
# startdate < timestamp
|
||||
if const.START in self._prefixes[item] and not const.END in self._prefixes[item]:
|
||||
if self._prefixes[item][const.START] < timestamp:
|
||||
prefix_data = copy.deepcopy(self._prefixes[item])
|
||||
if const.START in prefix_data:
|
||||
del prefix_data[const.START]
|
||||
return prefix_data
|
||||
elif self._lookuptype == "redis":
|
||||
|
||||
# enddate > timestamp
|
||||
elif not const.START in self._prefixes[item] and const.END in self._prefixes[item]:
|
||||
if self._prefixes[item][const.END] > timestamp:
|
||||
prefix_data = copy.deepcopy(self._prefixes[item])
|
||||
if const.END in prefix_data:
|
||||
del prefix_data[const.END]
|
||||
return prefix_data
|
||||
|
||||
# startdate > timestamp > enddate
|
||||
elif const.START in self._prefixes[item] and const.END in self._prefixes[item]:
|
||||
if self._prefixes[item][const.START] < timestamp and self._prefixes[item][const.END] > timestamp:
|
||||
prefix_data = copy.deepcopy(self._prefixes[item])
|
||||
if const.START in prefix_data:
|
||||
del prefix_data[const.START]
|
||||
if const.END in prefix_data:
|
||||
del prefix_data[const.END]
|
||||
return prefix_data
|
||||
|
||||
# no startdate or enddate available
|
||||
else:
|
||||
return self._prefixes[item]
|
||||
data_dict, index = self._get_dicts_from_redis("_prefix_", "_prefix_index_", self._redis_prefix, prefix)
|
||||
return self._check_data_for_date(prefix, timestamp, data_dict, index)
|
||||
|
||||
# no matching case
|
||||
raise KeyError
|
||||
|
||||
def is_invalid_operation(self, callsign, timestamp=datetime.utcnow().replace(tzinfo=UTC)):
|
||||
|
|
@ -347,6 +520,7 @@ class LookupLib(object):
|
|||
This method is available for
|
||||
|
||||
- clublogxml
|
||||
- redis
|
||||
|
||||
"""
|
||||
|
||||
|
|
@ -354,35 +528,50 @@ class LookupLib(object):
|
|||
|
||||
if self._lookuptype == "clublogxml":
|
||||
|
||||
if callsign in self._invalid_operations_index:
|
||||
for item in self._invalid_operations_index[callsign]:
|
||||
return self._check_inv_operation_for_date(callsign, timestamp, self._invalid_operations, self._invalid_operations_index)
|
||||
|
||||
# startdate < timestamp
|
||||
if const.START in self._invalid_operations[item] \
|
||||
and not const.END in self._invalid_operations[item]:
|
||||
if self._invalid_operations[item][const.START] < timestamp:
|
||||
return True
|
||||
elif self._lookuptype == "redis":
|
||||
|
||||
# enddate > timestamp
|
||||
elif not const.START in self._invalid_operations[item] \
|
||||
and const.END in self._invalid_operations[item]:
|
||||
if self._invalid_operations[item][const.END] > timestamp:
|
||||
return True
|
||||
|
||||
# startdate > timestamp > enddate
|
||||
elif const.START in self._invalid_operations[item] and const.END in self._invalid_operations[item]:
|
||||
if self._invalid_operations[item][const.START] < timestamp \
|
||||
and self._invalid_operations[item][const.END] > timestamp:
|
||||
return True
|
||||
|
||||
# no startdate or enddate available
|
||||
else:
|
||||
return True
|
||||
data_dict, index = self._get_dicts_from_redis("_inv_op_", "_inv_op_index_", self._redis_prefix, callsign)
|
||||
return self._check_inv_operation_for_date(callsign, timestamp, data_dict, index)
|
||||
|
||||
#no matching case
|
||||
raise KeyError
|
||||
|
||||
|
||||
def _check_zone_exception_for_date(self, item, timestamp, data_dict, data_index_dict):
|
||||
"""
|
||||
Checks the index and data if a cq-zone exception exists for the callsign
|
||||
When a zone exception is found, the zone is returned. If no exception is found
|
||||
a KeyError is raised
|
||||
|
||||
"""
|
||||
if item in data_index_dict:
|
||||
for item in data_index_dict[item]:
|
||||
|
||||
# startdate < timestamp
|
||||
if const.START in data_dict[item] and not const.END in data_dict[item]:
|
||||
if data_dict[item][const.START] < timestamp:
|
||||
return data_dict[item][const.CQZ]
|
||||
|
||||
# enddate > timestamp
|
||||
elif not const.START in data_dict[item] and const.END in data_dict[item]:
|
||||
if data_dict[item][const.END] > timestamp:
|
||||
return data_dict[item][const.CQZ]
|
||||
|
||||
# startdate > timestamp > enddate
|
||||
elif const.START in data_dict[item] and const.END in data_dict[item]:
|
||||
if data_dict[item][const.START] < timestamp \
|
||||
and data_dict[item][const.END] > timestamp:
|
||||
return data_dict[item][const.CQZ]
|
||||
|
||||
# no startdate or enddate available
|
||||
elif not const.START in data_dict[item] and not const.END in data_dict[item]:
|
||||
return data_dict[item][const.CQZ]
|
||||
|
||||
raise KeyError
|
||||
|
||||
|
||||
def lookup_zone_exception(self, callsign, timestamp=datetime.utcnow().replace(tzinfo=UTC)):
|
||||
"""
|
||||
Returns a CQ Zone if an exception exists for the given callsign
|
||||
|
|
@ -413,6 +602,7 @@ class LookupLib(object):
|
|||
This method is available for
|
||||
|
||||
- clublogxml
|
||||
- redis
|
||||
|
||||
"""
|
||||
|
||||
|
|
@ -420,28 +610,12 @@ class LookupLib(object):
|
|||
|
||||
if self._lookuptype == "clublogxml":
|
||||
|
||||
if callsign in self._zone_exceptions_index:
|
||||
for item in self._zone_exceptions_index[callsign]:
|
||||
return self._check_zone_exception_for_date(callsign, timestamp, self._zone_exceptions, self._zone_exceptions_index)
|
||||
|
||||
# startdate < timestamp
|
||||
if const.START in self._zone_exceptions[item] and not const.END in self._zone_exceptions[item]:
|
||||
if self._zone_exceptions[item][const.START] < timestamp:
|
||||
return self._zone_exceptions[item][const.CQZ]
|
||||
elif self._lookuptype == "redis":
|
||||
|
||||
# enddate > timestamp
|
||||
elif not const.START in self._zone_exceptions[item] and const.END in self._zone_exceptions[item]:
|
||||
if self._zone_exceptions[item][const.END] > timestamp:
|
||||
return self._zone_exceptions[item][const.CQZ]
|
||||
|
||||
# startdate > timestamp > enddate
|
||||
elif const.START in self._zone_exceptions[item] and const.END in self._zone_exceptions[item]:
|
||||
if self._zone_exceptions[item][const.START] < timestamp \
|
||||
and self._zone_exceptions[item][const.END] > timestamp:
|
||||
return self._zone_exceptions[item][const.CQZ]
|
||||
|
||||
# no startdate or enddate available
|
||||
else:
|
||||
return self._zone_exceptions[item][const.CQZ]
|
||||
data_dict, index = self._get_dicts_from_redis("_zone_ex_", "_zone_ex_index_", self._redis_prefix, callsign)
|
||||
return self._check_zone_exception_for_date(callsign, timestamp, data_dict, index)
|
||||
|
||||
#no matching case
|
||||
raise KeyError
|
||||
|
|
@ -972,4 +1146,62 @@ class LookupLib(object):
|
|||
if response.text.strip() == error1 or response.text.strip() == error2:
|
||||
raise APIKeyMissingError
|
||||
else:
|
||||
raise LookupError(err_str)
|
||||
raise LookupError(err_str)
|
||||
|
||||
|
||||
def _serialize_data(self, my_dict):
|
||||
"""
|
||||
Serialize a Dictionary into JSON
|
||||
"""
|
||||
new_dict = {}
|
||||
for item in my_dict:
|
||||
if isinstance(my_dict[item], datetime):
|
||||
new_dict[item] = my_dict[item].strftime('%Y-%m-%d%H:%M:%S')
|
||||
else:
|
||||
new_dict[item] = str(my_dict[item])
|
||||
|
||||
return json.dumps(new_dict)
|
||||
|
||||
def _deserialize_data(self, json_data):
|
||||
"""
|
||||
Deserialize a JSON into a dictionary
|
||||
"""
|
||||
|
||||
my_dict = json.loads(json_data)
|
||||
|
||||
for item in my_dict:
|
||||
if item == const.ADIF:
|
||||
my_dict[item] = int(my_dict[item])
|
||||
elif item == const.DELETED:
|
||||
my_dict[item] = self._str_to_bool(my_dict[item])
|
||||
elif item == const.CQZ:
|
||||
my_dict[item] = int(my_dict[item])
|
||||
elif item == const.ITUZ:
|
||||
my_dict[item] = int(my_dict[item])
|
||||
elif item == const.LATITUDE:
|
||||
my_dict[item] = float(my_dict[item])
|
||||
elif item == const.LONGITUDE:
|
||||
my_dict[item] = float(my_dict[item])
|
||||
elif item == const.START:
|
||||
my_dict[item] = datetime.strptime(my_dict[item], '%Y-%m-%d%H:%M:%S').replace(tzinfo=UTC)
|
||||
elif item == const.END:
|
||||
my_dict[item] = datetime.strptime(my_dict[item], '%Y-%m-%d%H:%M:%S').replace(tzinfo=UTC)
|
||||
elif item == const.WHITELIST_START:
|
||||
my_dict[item] = datetime.strptime(my_dict[item], '%Y-%m-%d%H:%M:%S').replace(tzinfo=UTC)
|
||||
elif item == const.WHITELIST_END:
|
||||
my_dict[item] = datetime.strptime(my_dict[item], '%Y-%m-%d%H:%M:%S').replace(tzinfo=UTC)
|
||||
elif item == const.WHITELIST:
|
||||
my_dict[item] = self._str_to_bool(my_dict[item])
|
||||
else:
|
||||
my_dict[item] = str(my_dict[item])
|
||||
|
||||
return my_dict
|
||||
|
||||
def _str_to_bool(self, input):
|
||||
if input.lower() == "true":
|
||||
return True
|
||||
elif input.lower() == "false":
|
||||
return False
|
||||
else:
|
||||
raise KeyError
|
||||
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
from pyhamtools.consts import Modes as const
|
||||
from pyhamtools.consts import LookupConventions as const
|
||||
|
||||
|
||||
def freq_to_band(freq):
|
||||
|
|
|
|||
3
setup.py
3
setup.py
|
|
@ -8,11 +8,12 @@ if sys.version_info >= (3,):
|
|||
kw['use_2to3'] = True
|
||||
|
||||
setup(name='pyhamtools',
|
||||
version='0.1.1',
|
||||
version='0.3.1',
|
||||
description='Collection of Tools for Amateur Radio developers',
|
||||
author='Tobias Wellnitz, DH1TW',
|
||||
author_email='Tobias@dh1tw.de',
|
||||
url='http://github.com/dh1tw/pyhamtools',
|
||||
package_data={'': ['countryfilemapping.json']},
|
||||
packages=['pyhamtools'],
|
||||
install_requires=[
|
||||
"pytz",
|
||||
|
|
|
|||
|
|
@ -72,4 +72,9 @@ def fixCountryFile(request):
|
|||
def fix_callinfo(request, fixApiKey):
|
||||
lib = LookupLib(request.param, fixApiKey)
|
||||
callinfo = Callinfo(lib)
|
||||
return(callinfo)
|
||||
return(callinfo)
|
||||
|
||||
@pytest.fixture(scope="module")
|
||||
def fix_redis():
|
||||
import redis
|
||||
return LookupLib(lookuptype="redis", redis_instance=redis.Redis(), redis_prefix="clx")
|
||||
|
|
@ -48,7 +48,6 @@ response_prefix_C6A_countryfile = {
|
|||
}
|
||||
|
||||
|
||||
|
||||
response_Exception_VK9XO_with_start_date = {
|
||||
'adif': 35,
|
||||
'country': 'CHRISTMAS ISLAND',
|
||||
|
|
@ -72,6 +71,24 @@ response_lat_long_dh1tw = {
|
|||
const.LONGITUDE: -10.0
|
||||
}
|
||||
|
||||
response_maritime_mobile = {
|
||||
'adif': 999,
|
||||
'continent': '',
|
||||
'country': 'MARITIME MOBILE',
|
||||
'cqz': 0,
|
||||
'latitude': 0.0,
|
||||
'longitude': 0.0
|
||||
}
|
||||
|
||||
response_aircraft_mobile = {
|
||||
'adif': 998,
|
||||
'continent': '',
|
||||
'country': 'AIRCAFT MOBILE',
|
||||
'cqz': 0,
|
||||
'latitude': 0.0,
|
||||
'longitude': 0.0
|
||||
}
|
||||
|
||||
class Test_callinfo_methods:
|
||||
|
||||
def test_callinfo_iterate_prefix(self, fix_callinfo):
|
||||
|
|
@ -87,31 +104,38 @@ class Test_callinfo_methods:
|
|||
with pytest.raises(KeyError):
|
||||
fix_callinfo._iterate_prefix("QRM")
|
||||
|
||||
def test_is_maritime_mobile(self, fix_callinfo):
|
||||
assert fix_callinfo.check_if_mm("DH1TW/MM")
|
||||
assert not fix_callinfo.check_if_mm("DH1TW")
|
||||
|
||||
def test_is_aircraft_mobile(self, fix_callinfo):
|
||||
assert fix_callinfo.check_if_am("DH1TW/AM")
|
||||
assert not fix_callinfo.check_if_am("DH1TW")
|
||||
|
||||
def test_if_beacon(self, fix_callinfo):
|
||||
assert fix_callinfo.check_if_beacon("DH1TW/B")
|
||||
assert fix_callinfo.check_if_beacon("DH1TW/BCN")
|
||||
assert not fix_callinfo.check_if_beacon("DH1TW")
|
||||
|
||||
def test_get_homecall(self, fix_callinfo):
|
||||
assert fix_callinfo.get_homecall("HB9/DH1TW") == "DH1TW"
|
||||
assert fix_callinfo.get_homecall("SM3/DH1TW/P") == "DH1TW"
|
||||
assert fix_callinfo.get_homecall("QRM") is None
|
||||
with pytest.raises(ValueError):
|
||||
fix_callinfo.get_homecall("QRM")
|
||||
|
||||
def test_dismantle_callsign(self, fix_callinfo):
|
||||
if fix_callinfo._lookuplib._lookuptype == "clublogxml" or fix_callinfo._lookuplib._lookuptype == "countryfile":
|
||||
with pytest.raises(KeyError):
|
||||
fix_callinfo._dismantle_callsign("DH1TW/MM")
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
fix_callinfo._dismantle_callsign("DH1TW/AM")
|
||||
|
||||
if fix_callinfo._lookuplib._lookuptype == "clublogxml":
|
||||
#assert fix_callinfo._dismantle_callsign("DH1TW/BCN")[const.BEACON]
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/QRP") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/QRPP") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/BCN") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/LH") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("HC2AO/DL") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/P") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/5") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/M") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/B") == response_prefix_DH_clublog
|
||||
#assert fix_callinfo._dismantle_callsign("DH1TW/B")[const.BEACON]
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/B") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("DL/HC2AO") == response_prefix_DH_clublog
|
||||
assert fix_callinfo._dismantle_callsign("9H5A/C6A") == response_prefix_C6A_clublog
|
||||
# assert fix_callinfo._dismantle_callsign("C6A/9H5A") == response_Prefix_C6A
|
||||
|
|
@ -119,15 +143,14 @@ class Test_callinfo_methods:
|
|||
if fix_callinfo._lookuplib._lookuptype == "countryfile":
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/QRP") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/QRPP") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/BCN") == response_prefix_DH_countryfile
|
||||
#assert fix_callinfo._dismantle_callsign("DH1TW/BCN")[const.BEACON]
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/LH") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("HC2AO/DL") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/P") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/5") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/M") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/B") == response_prefix_DH_countryfile
|
||||
#assert fix_callinfo._dismantle_callsign("DH1TW/B")[const.BEACON]
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("DH1TW/B") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("DL/HC2AO") == response_prefix_DH_countryfile
|
||||
assert fix_callinfo._dismantle_callsign("9H5A/C6A") == response_prefix_C6A_countryfile
|
||||
# assert fix_callinfo._dismantle_callsign("C6A/9H5A") == response_Prefix_C6A
|
||||
|
|
@ -135,12 +158,18 @@ class Test_callinfo_methods:
|
|||
|
||||
def test_lookup_callsign(selfself, fix_callinfo):
|
||||
|
||||
assert fix_callinfo._lookup_callsign("DH1TW/MM") == response_maritime_mobile
|
||||
assert fix_callinfo._lookup_callsign("DH1TW/AM") == response_aircraft_mobile
|
||||
|
||||
if fix_callinfo._lookuplib._lookuptype == "clublogxml" or fix_callinfo._lookuplib._lookuptype == "clublogapi":
|
||||
with pytest.raises(KeyError):
|
||||
fix_callinfo._lookup_callsign("5W1CFN")
|
||||
|
||||
assert fix_callinfo._lookup_callsign("DH1TW/BCN")[const.BEACON]
|
||||
assert fix_callinfo._lookup_callsign("VK9XO") == response_Exception_VK9XO_with_start_date
|
||||
assert fix_callinfo._lookup_callsign("DH1TW") == response_prefix_DH_clublog
|
||||
|
||||
|
||||
elif fix_callinfo._lookuplib._lookuptype == "countryfile":
|
||||
assert fix_callinfo._lookup_callsign("DH1TW") == response_prefix_DH_countryfile
|
||||
with pytest.raises(KeyError):
|
||||
|
|
@ -158,6 +187,8 @@ class Test_callinfo_methods:
|
|||
elif fix_callinfo._lookuplib._lookuptype == "countryfile":
|
||||
assert fix_callinfo.get_all("DH1TW") == response_prefix_DH_countryfile
|
||||
|
||||
assert fix_callinfo.get_all("DH1TW/MM") == response_maritime_mobile
|
||||
|
||||
def test_is_valid_callsign(self, fix_callinfo):
|
||||
assert fix_callinfo.is_valid_callsign("DH1TW")
|
||||
assert not fix_callinfo.is_valid_callsign("QRM")
|
||||
|
|
|
|||
|
|
@ -142,7 +142,7 @@ class TestclublogXML_Getters:
|
|||
assert fixClublogXML.lookup_entity(230) == response_Entity_230
|
||||
assert fixClublogXML.lookup_entity("230") == response_Entity_230
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
with pytest.raises(ValueError):
|
||||
fixClublogXML.lookup_entity("foo")
|
||||
|
||||
with pytest.raises(KeyError):
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ def fixSetExceptions(request):
|
|||
class Test_Getter_Setter_Api_Types_for_all_sources:
|
||||
|
||||
def test_lookup_entity_without_entity_nr(self, fixGeneralApi):
|
||||
with pytest.raises(LookupError):
|
||||
with pytest.raises(Exception):
|
||||
fixGeneralApi.lookup_entity()
|
||||
|
||||
def test_lookup_entity(self, fixGeneralApi, fixEntities):
|
||||
|
|
@ -91,6 +91,10 @@ class Test_Getter_Setter_Api_Types_for_all_sources:
|
|||
assert len(entity) == count
|
||||
except KeyError:
|
||||
pass
|
||||
except TypeError:
|
||||
pass
|
||||
except ValueError:
|
||||
pass
|
||||
|
||||
def test_lookup_callsign(self, fixGeneralApi, fixExceptions):
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -1,58 +1,58 @@
|
|||
import pytest
|
||||
from pyhamtools.utils import freq_to_band
|
||||
from pyhamtools.consts import Modes as mode
|
||||
from pyhamtools.consts import LookupConventions as const
|
||||
|
||||
class Test_utils_freq_to_band():
|
||||
|
||||
def test_hf_frequencies(self):
|
||||
assert freq_to_band(137) == {"band" : 2190, "mode":mode.CW}
|
||||
assert freq_to_band(137) == {"band" : 2190, "mode":const.CW}
|
||||
|
||||
assert freq_to_band(1805) == {"band" : 160, "mode":mode.CW}
|
||||
assert freq_to_band(1838) == {"band" : 160, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(1870) == {"band" : 160, "mode":mode.LSB}
|
||||
assert freq_to_band(1805) == {"band" : 160, "mode":const.CW}
|
||||
assert freq_to_band(1838) == {"band" : 160, "mode":const.DIGITAL}
|
||||
assert freq_to_band(1870) == {"band" : 160, "mode":const.LSB}
|
||||
|
||||
assert freq_to_band(3500) == {"band" : 80, "mode":mode.CW}
|
||||
assert freq_to_band(3580) == {"band" : 80, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(3799) == {"band" : 80, "mode":mode.LSB}
|
||||
assert freq_to_band(3500) == {"band" : 80, "mode":const.CW}
|
||||
assert freq_to_band(3580) == {"band" : 80, "mode":const.DIGITAL}
|
||||
assert freq_to_band(3799) == {"band" : 80, "mode":const.LSB}
|
||||
|
||||
assert freq_to_band(5200) == {"band" : 60, "mode":None}
|
||||
|
||||
assert freq_to_band(7000) == {"band" : 40, "mode":mode.CW}
|
||||
assert freq_to_band(7044) == {"band" : 40, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(7139) == {"band" : 40, "mode":mode.LSB}
|
||||
assert freq_to_band(7000) == {"band" : 40, "mode":const.CW}
|
||||
assert freq_to_band(7044) == {"band" : 40, "mode":const.DIGITAL}
|
||||
assert freq_to_band(7139) == {"band" : 40, "mode":const.LSB}
|
||||
|
||||
assert freq_to_band(10100) == {"band" : 30, "mode":mode.CW}
|
||||
assert freq_to_band(10141) == {"band" : 30, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(10100) == {"band" : 30, "mode":const.CW}
|
||||
assert freq_to_band(10141) == {"band" : 30, "mode":const.DIGITAL}
|
||||
|
||||
assert freq_to_band(14000) == {"band" : 20, "mode":mode.CW}
|
||||
assert freq_to_band(14070) == {"band" : 20, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(14349) == {"band" : 20, "mode":mode.USB}
|
||||
assert freq_to_band(14000) == {"band" : 20, "mode":const.CW}
|
||||
assert freq_to_band(14070) == {"band" : 20, "mode":const.DIGITAL}
|
||||
assert freq_to_band(14349) == {"band" : 20, "mode":const.USB}
|
||||
|
||||
assert freq_to_band(18068) == {"band" : 17, "mode":mode.CW}
|
||||
assert freq_to_band(18096) == {"band" : 17, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(18250) == {"band" : 17, "mode":mode.USB}
|
||||
assert freq_to_band(18068) == {"band" : 17, "mode":const.CW}
|
||||
assert freq_to_band(18096) == {"band" : 17, "mode":const.DIGITAL}
|
||||
assert freq_to_band(18250) == {"band" : 17, "mode":const.USB}
|
||||
|
||||
assert freq_to_band(21000) == {"band" : 15, "mode":mode.CW}
|
||||
assert freq_to_band(21070) == {"band" : 15, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(21449) == {"band" : 15, "mode":mode.USB}
|
||||
assert freq_to_band(21000) == {"band" : 15, "mode":const.CW}
|
||||
assert freq_to_band(21070) == {"band" : 15, "mode":const.DIGITAL}
|
||||
assert freq_to_band(21449) == {"band" : 15, "mode":const.USB}
|
||||
|
||||
assert freq_to_band(24890) == {"band" : 12, "mode":mode.CW}
|
||||
assert freq_to_band(24916) == {"band" : 12, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(24965) == {"band" : 12, "mode":mode.USB}
|
||||
assert freq_to_band(24890) == {"band" : 12, "mode":const.CW}
|
||||
assert freq_to_band(24916) == {"band" : 12, "mode":const.DIGITAL}
|
||||
assert freq_to_band(24965) == {"band" : 12, "mode":const.USB}
|
||||
|
||||
assert freq_to_band(28000) == {"band" : 10, "mode":mode.CW}
|
||||
assert freq_to_band(28070) == {"band" : 10, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(28500) == {"band" : 10, "mode":mode.USB}
|
||||
assert freq_to_band(28000) == {"band" : 10, "mode":const.CW}
|
||||
assert freq_to_band(28070) == {"band" : 10, "mode":const.DIGITAL}
|
||||
assert freq_to_band(28500) == {"band" : 10, "mode":const.USB}
|
||||
|
||||
assert freq_to_band(50000) == {"band" : 6, "mode":mode.CW}
|
||||
assert freq_to_band(50100) == {"band" : 6, "mode":mode.USB}
|
||||
assert freq_to_band(50500) == {"band" : 6, "mode":mode.DIGITAL}
|
||||
assert freq_to_band(50000) == {"band" : 6, "mode":const.CW}
|
||||
assert freq_to_band(50100) == {"band" : 6, "mode":const.USB}
|
||||
assert freq_to_band(50500) == {"band" : 6, "mode":const.DIGITAL}
|
||||
|
||||
def test_vhf_frequencies(self):
|
||||
assert freq_to_band(70001) == {"band" : 4, "mode":None}
|
||||
|
||||
assert freq_to_band(144000) == {"band" : 2, "mode":mode.CW}
|
||||
assert freq_to_band(144150) == {"band" : 2, "mode":mode.USB}
|
||||
assert freq_to_band(144000) == {"band" : 2, "mode":const.CW}
|
||||
assert freq_to_band(144150) == {"band" : 2, "mode":const.USB}
|
||||
assert freq_to_band(144400) == {"band" : 2, "mode":None}
|
||||
|
||||
assert freq_to_band(220000) == {"band" : 1.25, "mode":None}
|
||||
|
|
|
|||
Loading…
Reference in a new issue