mirror of
https://github.com/dh1tw/pyhamtools.git
synced 2025-12-06 06:52:00 +01:00
1523 lines
59 KiB
Python
1523 lines
59 KiB
Python
import os
|
|
import logging
|
|
import logging.config
|
|
import re
|
|
import random, string
|
|
from datetime import datetime, timezone
|
|
import xml.etree.ElementTree as ET
|
|
import urllib
|
|
import json
|
|
import copy
|
|
|
|
import requests
|
|
from requests.exceptions import ConnectionError, HTTPError, Timeout
|
|
from bs4 import BeautifulSoup
|
|
|
|
from . import version
|
|
from .consts import LookupConventions as const
|
|
from .exceptions import APIKeyMissingError
|
|
|
|
REDIS_LUA_DEL_SCRIPT = "local keys = redis.call('keys', ARGV[1]) \n for i=1,#keys,20000 do \n redis.call('del', unpack(keys, i, math.min(i+19999, #keys))) \n end \n return keys"
|
|
|
|
class LookupLib(object):
|
|
"""
|
|
|
|
This class is a wrapper for the following three Amateur Radio databases:
|
|
|
|
1. Clublog.org (daily updated XML File)
|
|
2. Clublog.org (HTTPS lookup)
|
|
3. Country-files.com (infrequently updated PLIST File)
|
|
4. QRZ.com (HTTP / XML Lookup)
|
|
|
|
It's aim is to provide a homogeneous interface to different databases.
|
|
|
|
Typically an instance of this class is injected as a dependency in the :py:class:`Callinfo` class, but it can also
|
|
be used directly.
|
|
|
|
Even the interface is the same for all lookup sources, the returning data can be different.
|
|
The documentation of the various methods provide more detail.
|
|
|
|
By default, LookupLib requires an Internet connection to download the libraries or perform the
|
|
lookup against the Clublog API or QRZ.com.
|
|
|
|
The entire lookup data (where database files are downloaded) can also be copied into Redis, which an extremely
|
|
fast in-memory Key/Value store. A LookupLib object can be instantiated to perform then all lookups in Redis,
|
|
instead processing and loading the data from Internet / File. This saves some time and allows several instances
|
|
of :py:class:`LookupLib` to query the same data concurrently.
|
|
|
|
Args:
|
|
lookuptype (str) : "clublogxml" or "clublogapi" or "countryfile" or "redis" or "qrz"
|
|
apikey (str): Clublog API Key
|
|
username (str): QRZ.com username
|
|
pwd (str): QRZ.com password
|
|
apiv (str, optional): QRZ.com API Version
|
|
filename (str, optional): Filename for Clublog XML or Country-files.com cty.plist file. When a local file is
|
|
used, no Internet connection not API Key is necessary.
|
|
logger (logging.getLogger(__name__), optional): Python logger
|
|
redis_instance (redis.Redis(), optional): Instance of Redis
|
|
redis_prefix (str, optional): Prefix to identify the lookup data set in Redis
|
|
|
|
|
|
"""
|
|
def __init__(self, lookuptype = "countryfile", apikey=None, apiv="1.3.3", filename=None, logger=None, username=None, pwd=None, redis_instance=None, redis_prefix=None):
|
|
|
|
self._logger = None
|
|
if logger:
|
|
self._logger = logger
|
|
else:
|
|
self._logger = logging.getLogger(__name__)
|
|
self._logger.addHandler(logging.NullHandler())
|
|
|
|
self._apikey = apikey
|
|
self._apiv = apiv
|
|
self._download = True
|
|
self._lib_filename = filename
|
|
self._redis = redis_instance
|
|
self._redis_prefix = redis_prefix
|
|
self._username = username
|
|
self._pwd = pwd
|
|
|
|
if self._lib_filename:
|
|
self._download = False
|
|
|
|
self._callsign_exceptions_index = {}
|
|
self._invalid_operations_index = {}
|
|
self._zone_exceptions_index = {}
|
|
|
|
self._entities = {}
|
|
self._callsign_exceptions = {}
|
|
self._invalid_operations = {}
|
|
self._zone_exceptions = {}
|
|
self._lookuptype = lookuptype
|
|
|
|
if self._lookuptype == "clublogxml":
|
|
self._load_clublogXML(apikey=self._apikey, cty_file=self._lib_filename)
|
|
elif self._lookuptype == "countryfile":
|
|
self._load_countryfile(cty_file=self._lib_filename)
|
|
elif self._lookuptype == "clublogapi":
|
|
pass
|
|
elif self._lookuptype == "redis":
|
|
import redis
|
|
elif self._lookuptype == "qrz":
|
|
self._apikey = self._get_qrz_session_key(self._username, self._pwd)
|
|
else:
|
|
raise AttributeError("Lookup type missing")
|
|
|
|
def _get_qrz_session_key(self, username, pwd):
|
|
|
|
qrz_api_version = "1.3.3"
|
|
url = "https://xmldata.qrz.com/xml/" + qrz_api_version + "/"
|
|
agent = "PyHamTools"+version.__version__
|
|
|
|
params = {"username" : username,
|
|
"password" : pwd,
|
|
"agent" : agent
|
|
}
|
|
|
|
encodeurl = url + "?" + urllib.parse.urlencode(params)
|
|
response = requests.get(encodeurl, timeout=10)
|
|
doc = BeautifulSoup(response.text, "xml")
|
|
session_key = None
|
|
if doc.QRZDatabase.Session.Key:
|
|
session_key = doc.QRZDatabase.Session.Key.text
|
|
else:
|
|
if doc.QRZDatabase.Session.Error:
|
|
raise ValueError(doc.QRZDatabase.Session.Error.text)
|
|
else:
|
|
raise ValueError("Could not retrieve Session Key from QRZ.com")
|
|
|
|
return session_key
|
|
|
|
|
|
def copy_data_in_redis(self, redis_prefix, redis_instance):
|
|
"""
|
|
Copy the complete lookup data into redis. Old data will be overwritten.
|
|
|
|
Args:
|
|
redis_prefix (str): Prefix to distinguish the data in redis for the different looktypes
|
|
redis_instance (str): an Instance of Redis
|
|
|
|
Returns:
|
|
bool: returns True when the data has been copied successfully into Redis
|
|
|
|
Example:
|
|
Copy the entire lookup data from the Country-files.com PLIST File into Redis. This example requires a running
|
|
instance of Redis, as well the python Redis connector (pip install redis-py).
|
|
|
|
>>> from pyhamtools import LookupLib
|
|
>>> import redis
|
|
>>> r = redis.Redis()
|
|
>>> my_lookuplib = LookupLib(lookuptype="countryfile")
|
|
>>> print my_lookuplib.copy_data_in_redis(redis_prefix="CF", redis_instance=r)
|
|
True
|
|
|
|
Now let's create an instance of LookupLib, using Redis to query the data
|
|
|
|
>>> from pyhamtools import LookupLib
|
|
>>> import redis
|
|
>>> r = redis.Redis()
|
|
>>> my_lookuplib = LookupLib(lookuptype="redis", redis_instance=r, redis_prefix="CF")
|
|
>>> my_lookuplib.lookup_callsign("3D2RI")
|
|
{
|
|
u'adif': 460,
|
|
u'continent': u'OC',
|
|
u'country': u'Rotuma Island',
|
|
u'cqz': 32,
|
|
u'ituz': 56,
|
|
u'latitude': -12.48,
|
|
u'longitude': 177.08
|
|
}
|
|
|
|
|
|
Note:
|
|
This method is available for the following lookup type
|
|
|
|
- clublogxml
|
|
- countryfile
|
|
"""
|
|
|
|
if redis_instance is not None:
|
|
self._redis = redis_instance
|
|
|
|
if self._redis is None:
|
|
raise AttributeError("redis_instance is missing")
|
|
|
|
if redis_prefix is None:
|
|
raise KeyError("redis_prefix is missing")
|
|
|
|
if self._lookuptype == "clublogxml" or self._lookuptype == "countryfile":
|
|
|
|
self._push_dict_to_redis(self._entities, redis_prefix, "_entity_")
|
|
|
|
self._push_dict_index_to_redis(self._callsign_exceptions_index, redis_prefix, "_call_ex_index_")
|
|
self._push_dict_to_redis(self._callsign_exceptions, redis_prefix, "_call_ex_")
|
|
|
|
self._push_dict_index_to_redis(self._prefixes_index, redis_prefix, "_prefix_index_")
|
|
self._push_dict_to_redis(self._prefixes, redis_prefix, "_prefix_")
|
|
|
|
self._push_dict_index_to_redis(self._invalid_operations_index, redis_prefix, "_inv_op_index_")
|
|
self._push_dict_to_redis(self._invalid_operations, redis_prefix, "_inv_op_")
|
|
|
|
self._push_dict_index_to_redis(self._zone_exceptions_index, redis_prefix, "_zone_ex_index_")
|
|
self._push_dict_to_redis(self._zone_exceptions, redis_prefix, "_zone_ex_")
|
|
|
|
return True
|
|
|
|
def _push_dict_to_redis(self, push_dict, redis_prefix, name):
|
|
r = self._redis
|
|
pipe = r.pipeline()
|
|
pipe.eval(REDIS_LUA_DEL_SCRIPT, 0, redis_prefix + name)
|
|
|
|
for i in push_dict:
|
|
json_data = self._serialize_data(push_dict[i])
|
|
pipe.set(redis_prefix + name + str(i), json_data)
|
|
|
|
pipe.execute()
|
|
return True
|
|
|
|
def _push_dict_index_to_redis(self, index_dict, redis_prefix, name):
|
|
r = self._redis
|
|
pipe = r.pipeline()
|
|
pipe.eval(REDIS_LUA_DEL_SCRIPT, 0, redis_prefix + name)
|
|
|
|
for i in index_dict:
|
|
for el in index_dict[i]:
|
|
pipe.sadd(redis_prefix + name + str(i), el)
|
|
|
|
pipe.execute()
|
|
return True
|
|
|
|
|
|
|
|
def lookup_entity(self, entity=None):
|
|
"""Returns lookup data of an ADIF Entity
|
|
|
|
Args:
|
|
entity (int): ADIF identifier of country
|
|
|
|
Returns:
|
|
dict: Dictionary containing the country specific data
|
|
|
|
Raises:
|
|
KeyError: No matching entity found
|
|
|
|
Example:
|
|
The following code queries the the Clublog XML database for the ADIF entity Turkmenistan, which has
|
|
the id 273.
|
|
|
|
>>> from pyhamtools import LookupLib
|
|
>>> my_lookuplib = LookupLib(lookuptype="clublogapi", apikey="myapikey")
|
|
>>> print my_lookuplib.lookup_entity(273)
|
|
{
|
|
'deleted': False,
|
|
'country': u'TURKMENISTAN',
|
|
'longitude': 58.4,
|
|
'cqz': 17,
|
|
'prefix': u'EZ',
|
|
'latitude': 38.0,
|
|
'continent': u'AS'
|
|
}
|
|
|
|
|
|
Note:
|
|
This method is available for the following lookup type
|
|
|
|
- clublogxml
|
|
- redis
|
|
- qrz.com
|
|
|
|
"""
|
|
if self._lookuptype == "clublogxml":
|
|
entity = int(entity)
|
|
if entity in self._entities:
|
|
return self._strip_metadata(self._entities[entity])
|
|
else:
|
|
raise KeyError
|
|
|
|
elif self._lookuptype == "redis":
|
|
if self._redis_prefix is None:
|
|
raise KeyError ("redis_prefix is missing")
|
|
#entity = str(entity)
|
|
json_data = self._redis.get(self._redis_prefix + "_entity_" + str(entity))
|
|
if json_data is not None:
|
|
my_dict = self._deserialize_data(json_data)
|
|
return self._strip_metadata(my_dict)
|
|
|
|
elif self._lookuptype == "qrz":
|
|
result = self._lookup_qrz_dxcc(entity, self._apikey)
|
|
return result
|
|
|
|
# no matching case
|
|
raise KeyError
|
|
|
|
def _strip_metadata(self, my_dict):
|
|
"""
|
|
Create a copy of dict and remove not needed data
|
|
"""
|
|
new_dict = copy.deepcopy(my_dict)
|
|
if const.START in new_dict:
|
|
del new_dict[const.START]
|
|
if const.END in new_dict:
|
|
del new_dict[const.END]
|
|
if const.WHITELIST in new_dict:
|
|
del new_dict[const.WHITELIST]
|
|
if const.WHITELIST_START in new_dict:
|
|
del new_dict[const.WHITELIST_START]
|
|
if const.WHITELIST_END in new_dict:
|
|
del new_dict[const.WHITELIST_END]
|
|
return new_dict
|
|
|
|
|
|
def lookup_callsign(self, callsign=None, timestamp=None):
|
|
"""
|
|
Returns lookup data if an exception exists for a callsign
|
|
|
|
Args:
|
|
callsign (string): Amateur radio callsign
|
|
timestamp (datetime, optional): datetime in UTC (tzinfo=timezone.utc)
|
|
|
|
Returns:
|
|
dict: Dictionary containing the country specific data of the callsign
|
|
|
|
Raises:
|
|
KeyError: No matching callsign found
|
|
APIKeyMissingError: API Key for Clublog missing or incorrect
|
|
|
|
Example:
|
|
The following code queries the the online Clublog API for the callsign "VK9XO" on a specific date.
|
|
|
|
>>> from pyhamtools import LookupLib
|
|
>>> from datetime import datetime, timezone
|
|
>>> my_lookuplib = LookupLib(lookuptype="clublogapi", apikey="myapikey")
|
|
>>> timestamp = datetime(year=1962, month=7, day=7, tzinfo=timezone.utc)
|
|
>>> print my_lookuplib.lookup_callsign("VK9XO", timestamp)
|
|
{
|
|
'country': u'CHRISTMAS ISLAND',
|
|
'longitude': 105.7,
|
|
'cqz': 29,
|
|
'adif': 35,
|
|
'latitude': -10.5,
|
|
'continent': u'OC'
|
|
}
|
|
|
|
Note:
|
|
This method is available for
|
|
|
|
- clublogxml
|
|
- clublogapi
|
|
- countryfile
|
|
- qrz.com
|
|
- redis
|
|
|
|
|
|
"""
|
|
callsign = callsign.strip().upper()
|
|
if timestamp is None:
|
|
timestamp = datetime.now(timezone.utc)
|
|
|
|
if self._lookuptype == "clublogapi":
|
|
callsign_data = self._lookup_clublogAPI(callsign=callsign, timestamp=timestamp, apikey=self._apikey)
|
|
if callsign_data[const.ADIF]==1000:
|
|
raise KeyError
|
|
else:
|
|
return callsign_data
|
|
|
|
elif self._lookuptype == "clublogxml" or self._lookuptype == "countryfile":
|
|
|
|
return self._check_data_for_date(callsign, timestamp, self._callsign_exceptions, self._callsign_exceptions_index)
|
|
|
|
elif self._lookuptype == "redis":
|
|
|
|
data_dict, index = self._get_dicts_from_redis("_call_ex_", "_call_ex_index_", self._redis_prefix, callsign)
|
|
return self._check_data_for_date(callsign, timestamp, data_dict, index)
|
|
|
|
# no matching case
|
|
elif self._lookuptype == "qrz":
|
|
return self._lookup_qrz_callsign(callsign, self._apikey, self._apiv)
|
|
|
|
raise KeyError("unknown Callsign")
|
|
|
|
def _get_dicts_from_redis(self, name, index_name, redis_prefix, item):
|
|
"""
|
|
Retrieve the data of an item from redis and put it in an index and data dictionary to match the
|
|
common query interface.
|
|
"""
|
|
r = self._redis
|
|
data_dict = {}
|
|
data_index_dict = {}
|
|
|
|
if redis_prefix is None:
|
|
raise KeyError ("redis_prefix is missing")
|
|
|
|
if r.scard(redis_prefix + index_name + str(item)) > 0:
|
|
data_index_dict[str(item)] = r.smembers(redis_prefix + index_name + str(item))
|
|
|
|
for i in data_index_dict[item]:
|
|
json_data = r.get(redis_prefix + name + str(int(i)))
|
|
data_dict[i] = self._deserialize_data(json_data)
|
|
|
|
return (data_dict, data_index_dict)
|
|
|
|
raise KeyError ("No Data found in Redis for "+ item)
|
|
|
|
def _check_data_for_date(self, item, timestamp, data_dict, data_index_dict):
|
|
"""
|
|
Checks if the item is found in the index. An entry in the index points to the data
|
|
in the data_dict. This is mainly used retrieve callsigns and prefixes.
|
|
In case data is found for item, a dict containing the data is returned. Otherwise a KeyError is raised.
|
|
"""
|
|
|
|
if item in data_index_dict:
|
|
for item in data_index_dict[item]:
|
|
|
|
# startdate < timestamp
|
|
if const.START in data_dict[item] and not const.END in data_dict[item]:
|
|
if data_dict[item][const.START] < timestamp:
|
|
item_data = copy.deepcopy(data_dict[item])
|
|
del item_data[const.START]
|
|
return item_data
|
|
|
|
# enddate > timestamp
|
|
elif not const.START in data_dict[item] and const.END in data_dict[item]:
|
|
if data_dict[item][const.END] > timestamp:
|
|
item_data = copy.deepcopy(data_dict[item])
|
|
del item_data[const.END]
|
|
return item_data
|
|
|
|
# startdate > timestamp > enddate
|
|
elif const.START in data_dict[item] and const.END in data_dict[item]:
|
|
if data_dict[item][const.START] < timestamp \
|
|
and data_dict[item][const.END] > timestamp:
|
|
item_data = copy.deepcopy(data_dict[item])
|
|
del item_data[const.START]
|
|
del item_data[const.END]
|
|
return item_data
|
|
|
|
# no startdate or enddate available
|
|
elif not const.START in data_dict[item] and not const.END in data_dict[item]:
|
|
return data_dict[item]
|
|
|
|
raise KeyError
|
|
|
|
|
|
def _check_inv_operation_for_date(self, item, timestamp, data_dict, data_index_dict):
|
|
"""
|
|
Checks if the callsign is marked as an invalid operation for a given timestamp.
|
|
In case the operation is invalid, True is returned. Otherwise a KeyError is raised.
|
|
"""
|
|
|
|
if item in data_index_dict:
|
|
for item in data_index_dict[item]:
|
|
|
|
# startdate < timestamp
|
|
if const.START in data_dict[item] and not const.END in data_dict[item]:
|
|
if data_dict[item][const.START] < timestamp:
|
|
return True
|
|
|
|
# enddate > timestamp
|
|
elif not const.START in data_dict[item] and const.END in data_dict[item]:
|
|
if data_dict[item][const.END] > timestamp:
|
|
return True
|
|
|
|
# startdate > timestamp > enddate
|
|
elif const.START in data_dict[item] and const.END in data_dict[item]:
|
|
if data_dict[item][const.START] < timestamp \
|
|
and data_dict[item][const.END] > timestamp:
|
|
return True
|
|
|
|
# no startdate or enddate available
|
|
elif not const.START in data_dict[item] and not const.END in data_dict[item]:
|
|
return True
|
|
|
|
raise KeyError
|
|
|
|
|
|
def lookup_prefix(self, prefix, timestamp=None):
|
|
"""
|
|
Returns lookup data of a Prefix
|
|
|
|
Args:
|
|
prefix (string): Prefix of a Amateur Radio callsign
|
|
timestamp (datetime, optional): datetime in UTC (tzinfo=timezone.utc)
|
|
|
|
Returns:
|
|
dict: Dictionary containing the country specific data of the Prefix
|
|
|
|
Raises:
|
|
KeyError: No matching Prefix found
|
|
APIKeyMissingError: API Key for Clublog missing or incorrect
|
|
|
|
Example:
|
|
The following code shows how to obtain the information for the prefix "DH" from the countryfile.com
|
|
database (default database).
|
|
|
|
>>> from pyhamtools import LookupLib
|
|
>>> myLookupLib = LookupLib()
|
|
>>> print myLookupLib.lookup_prefix("DH")
|
|
{
|
|
'adif': 230,
|
|
'country': u'Fed. Rep. of Germany',
|
|
'longitude': 10.0,
|
|
'cqz': 14,
|
|
'ituz': 28,
|
|
'latitude': 51.0,
|
|
'continent': u'EU'
|
|
}
|
|
|
|
Note:
|
|
This method is available for
|
|
|
|
- clublogxml
|
|
- countryfile
|
|
- redis
|
|
|
|
"""
|
|
|
|
prefix = prefix.strip().upper()
|
|
if timestamp is None:
|
|
timestamp = datetime.now(timezone.utc)
|
|
|
|
if self._lookuptype == "clublogxml" or self._lookuptype == "countryfile":
|
|
|
|
return self._check_data_for_date(prefix, timestamp, self._prefixes, self._prefixes_index)
|
|
|
|
elif self._lookuptype == "redis":
|
|
|
|
data_dict, index = self._get_dicts_from_redis("_prefix_", "_prefix_index_", self._redis_prefix, prefix)
|
|
return self._check_data_for_date(prefix, timestamp, data_dict, index)
|
|
|
|
# no matching case
|
|
raise KeyError
|
|
|
|
def is_invalid_operation(self, callsign, timestamp=None):
|
|
"""
|
|
Returns True if an operations is known as invalid
|
|
|
|
Args:
|
|
callsign (string): Amateur Radio callsign
|
|
timestamp (datetime, optional): datetime in UTC (tzinfo=timezone.utc)
|
|
|
|
Returns:
|
|
bool: True if a record exists for this callsign (at the given time)
|
|
|
|
Raises:
|
|
KeyError: No matching callsign found
|
|
APIKeyMissingError: API Key for Clublog missing or incorrect
|
|
|
|
Example:
|
|
The following code checks the Clublog XML database if the operation is valid for two dates.
|
|
|
|
>>> from pyhamtools import LookupLib
|
|
>>> from datetime import datetime, timezone
|
|
>>> my_lookuplib = LookupLib(lookuptype="clublogxml", apikey="myapikey")
|
|
>>> print my_lookuplib.is_invalid_operation("5W1CFN")
|
|
True
|
|
>>> try:
|
|
>>> timestamp = datetime(year=2012, month=1, day=31, tzinfo=timezone.utc)
|
|
>>> my_lookuplib.is_invalid_operation("5W1CFN", timestamp)
|
|
>>> except KeyError:
|
|
>>> print "Seems to be invalid operation before 31.1.2012"
|
|
Seems to be an invalid operation before 31.1.2012
|
|
|
|
Note:
|
|
This method is available for
|
|
|
|
- clublogxml
|
|
- redis
|
|
|
|
"""
|
|
|
|
callsign = callsign.strip().upper()
|
|
if timestamp is None:
|
|
timestamp = datetime.now(timezone.utc)
|
|
|
|
if self._lookuptype == "clublogxml":
|
|
|
|
return self._check_inv_operation_for_date(callsign, timestamp, self._invalid_operations, self._invalid_operations_index)
|
|
|
|
elif self._lookuptype == "redis":
|
|
|
|
data_dict, index = self._get_dicts_from_redis("_inv_op_", "_inv_op_index_", self._redis_prefix, callsign)
|
|
return self._check_inv_operation_for_date(callsign, timestamp, data_dict, index)
|
|
|
|
#no matching case
|
|
raise KeyError
|
|
|
|
|
|
def _check_zone_exception_for_date(self, item, timestamp, data_dict, data_index_dict):
|
|
"""
|
|
Checks the index and data if a cq-zone exception exists for the callsign
|
|
When a zone exception is found, the zone is returned. If no exception is found
|
|
a KeyError is raised
|
|
|
|
"""
|
|
if item in data_index_dict:
|
|
for item in data_index_dict[item]:
|
|
|
|
# startdate < timestamp
|
|
if const.START in data_dict[item] and not const.END in data_dict[item]:
|
|
if data_dict[item][const.START] < timestamp:
|
|
return data_dict[item][const.CQZ]
|
|
|
|
# enddate > timestamp
|
|
elif not const.START in data_dict[item] and const.END in data_dict[item]:
|
|
if data_dict[item][const.END] > timestamp:
|
|
return data_dict[item][const.CQZ]
|
|
|
|
# startdate > timestamp > enddate
|
|
elif const.START in data_dict[item] and const.END in data_dict[item]:
|
|
if data_dict[item][const.START] < timestamp \
|
|
and data_dict[item][const.END] > timestamp:
|
|
return data_dict[item][const.CQZ]
|
|
|
|
# no startdate or enddate available
|
|
elif not const.START in data_dict[item] and not const.END in data_dict[item]:
|
|
return data_dict[item][const.CQZ]
|
|
|
|
raise KeyError
|
|
|
|
|
|
def lookup_zone_exception(self, callsign, timestamp=None):
|
|
"""
|
|
Returns a CQ Zone if an exception exists for the given callsign
|
|
|
|
Args:
|
|
callsign (string): Amateur radio callsign
|
|
timestamp (datetime, optional): datetime in UTC (tzinfo=timezone.utc)
|
|
|
|
Returns:
|
|
int: Value of the the CQ Zone exception which exists for this callsign (at the given time)
|
|
|
|
Raises:
|
|
KeyError: No matching callsign found
|
|
APIKeyMissingError: API Key for Clublog missing or incorrect
|
|
|
|
Example:
|
|
The following code checks the Clublog XML database if a CQ Zone exception exists for the callsign DP0GVN.
|
|
|
|
>>> from pyhamtools import LookupLib
|
|
>>> my_lookuplib = LookupLib(lookuptype="clublogxml", apikey="myapikey")
|
|
>>> print my_lookuplib.lookup_zone_exception("DP0GVN")
|
|
38
|
|
|
|
The prefix "DP" It is assigned to Germany, but the station is located in Antarctica, and therefore
|
|
in CQ Zone 38
|
|
|
|
Note:
|
|
This method is available for
|
|
|
|
- clublogxml
|
|
- redis
|
|
|
|
"""
|
|
|
|
callsign = callsign.strip().upper()
|
|
if timestamp is None:
|
|
timestamp = datetime.now(timezone.utc)
|
|
|
|
if self._lookuptype == "clublogxml":
|
|
|
|
return self._check_zone_exception_for_date(callsign, timestamp, self._zone_exceptions, self._zone_exceptions_index)
|
|
|
|
elif self._lookuptype == "redis":
|
|
|
|
data_dict, index = self._get_dicts_from_redis("_zone_ex_", "_zone_ex_index_", self._redis_prefix, callsign)
|
|
return self._check_zone_exception_for_date(callsign, timestamp, data_dict, index)
|
|
|
|
#no matching case
|
|
raise KeyError
|
|
|
|
def _lookup_clublogAPI(self, callsign=None, timestamp=None, url="https://cdn.clublog.org/dxcc", apikey=None):
|
|
""" Set up the Lookup object for Clublog Online API
|
|
"""
|
|
|
|
params = {"year" : timestamp.strftime("%Y"),
|
|
"month" : timestamp.strftime("%m"),
|
|
"day" : timestamp.strftime("%d"),
|
|
"hour" : timestamp.strftime("%H"),
|
|
"minute" : timestamp.strftime("%M"),
|
|
"api" : apikey,
|
|
"full" : "1",
|
|
"call" : callsign
|
|
}
|
|
|
|
if timestamp is None:
|
|
timestamp = datetime.now(timezone.utc)
|
|
|
|
encodeurl = url + "?" + urllib.parse.urlencode(params)
|
|
response = requests.get(encodeurl, timeout=5)
|
|
|
|
if not self._check_html_response(response):
|
|
raise LookupError
|
|
|
|
jsonLookup = response.json()
|
|
lookup = {}
|
|
|
|
for item in jsonLookup:
|
|
if item == "Name": lookup[const.COUNTRY] = jsonLookup["Name"]
|
|
elif item == "DXCC": lookup[const.ADIF] = int(jsonLookup["DXCC"])
|
|
elif item == "Lon": lookup[const.LONGITUDE] = float(jsonLookup["Lon"])*(-1)
|
|
elif item == "Lat": lookup[const.LATITUDE] = float(jsonLookup["Lat"])
|
|
elif item == "CQZ": lookup[const.CQZ] = int(jsonLookup["CQZ"])
|
|
elif item == "Continent": lookup[const.CONTINENT] = jsonLookup["Continent"]
|
|
|
|
if lookup[const.ADIF] == 0:
|
|
raise KeyError
|
|
else:
|
|
return lookup
|
|
|
|
def _request_callsign_info_from_qrz(self, callsign, apikey, apiv="1.3.3"):
|
|
qrz_api_version = apiv
|
|
url = "https://xmldata.qrz.com/xml/" + qrz_api_version + "/"
|
|
|
|
params = {
|
|
"s": apikey,
|
|
"callsign" : callsign,
|
|
}
|
|
|
|
encodeurl = url + "?" + urllib.parse.urlencode(params)
|
|
response = requests.get(encodeurl, timeout=5)
|
|
return response
|
|
|
|
def _request_dxcc_info_from_qrz(self, dxcc_or_callsign, apikey, apiv="1.3.3"):
|
|
qrz_api_version = apiv
|
|
url = "https://xmldata.qrz.com/xml/" + qrz_api_version + "/"
|
|
|
|
params = {
|
|
"s": apikey,
|
|
"dxcc" : str(dxcc_or_callsign),
|
|
}
|
|
|
|
encodeurl = url + "?" + urllib.parse.urlencode(params)
|
|
response = requests.get(encodeurl, timeout=5)
|
|
return response
|
|
|
|
def _lookup_qrz_dxcc(self, dxcc_or_callsign, apikey, apiv="1.3.3"):
|
|
""" Performs the dxcc lookup against the QRZ.com XML API:
|
|
"""
|
|
|
|
response = self._request_dxcc_info_from_qrz(dxcc_or_callsign, apikey, apiv=apiv)
|
|
|
|
root = BeautifulSoup(response.text, "xml")
|
|
lookup = {}
|
|
|
|
if root.Error: #try to get a new session key and try to request again
|
|
|
|
if re.search('No DXCC Information for', root.Error.text, re.I): #No data available for callsign
|
|
raise KeyError(root.Error.text)
|
|
elif re.search('Session Timeout', root.Error.text, re.I): # Get new session key
|
|
self._apikey = apikey = self._get_qrz_session_key(self._username, self._pwd)
|
|
response = self._request_dxcc_info_from_qrz(dxcc_or_callsign, apikey)
|
|
root = BeautifulSoup(response.text, "xml")
|
|
else:
|
|
raise AttributeError("Session Key Missing") #most likely session key missing or invalid
|
|
|
|
if root.DXCC is None:
|
|
raise ValueError
|
|
|
|
if root.DXCC.dxcc:
|
|
lookup[const.ADIF] = int(root.DXCC.dxcc.text)
|
|
if root.DXCC.cc:
|
|
lookup['cc'] = root.DXCC.cc.text
|
|
if root.DXCC.ccc:
|
|
lookup['ccc'] = root.DXCC.ccc.text
|
|
if root.find('name'):
|
|
lookup[const.COUNTRY] = root.find('name').get_text()
|
|
if root.DXCC.continent:
|
|
lookup[const.CONTINENT] = root.DXCC.continent.text
|
|
if root.DXCC.ituzone:
|
|
lookup[const.ITUZ] = int(root.DXCC.ituzone.text)
|
|
if root.DXCC.cqzone:
|
|
lookup[const.CQZ] = int(root.DXCC.cqzone.text)
|
|
if root.DXCC.timezone:
|
|
lookup['timezone'] = float(root.DXCC.timezone.text)
|
|
if root.DXCC.lat:
|
|
lookup[const.LATITUDE] = float(root.DXCC.lat.text)
|
|
if root.DXCC.lon:
|
|
lookup[const.LONGITUDE] = float(root.DXCC.lon.text)
|
|
|
|
return lookup
|
|
|
|
|
|
def _lookup_qrz_callsign(self, callsign=None, apikey=None, apiv="1.3.3"):
|
|
""" Performs the callsign lookup against the QRZ.com XML API:
|
|
"""
|
|
|
|
if apikey is None:
|
|
raise AttributeError("Session Key Missing")
|
|
|
|
callsign = callsign.upper()
|
|
|
|
response = self._request_callsign_info_from_qrz(callsign, apikey, apiv)
|
|
|
|
root = BeautifulSoup(response.text, "xml")
|
|
lookup = {}
|
|
|
|
if root.Error:
|
|
|
|
if re.search('Not found', root.Error.text, re.I): #No data available for callsign
|
|
raise KeyError(root.Error.text)
|
|
|
|
#try to get a new session key and try to request again
|
|
elif re.search('Session Timeout', root.Error.text, re.I) or re.search('Invalid session key', root.Error.text, re.I):
|
|
apikey = self._get_qrz_session_key(self._username, self._pwd)
|
|
response = self._request_callsign_info_from_qrz(callsign, apikey, apiv)
|
|
root = BeautifulSoup(response.text, "xml")
|
|
|
|
#if this fails again, raise error
|
|
if root.Error:
|
|
|
|
if re.search('Not found', root.Error.text, re.I): #No data available for callsign
|
|
raise KeyError(root.Error.text)
|
|
else:
|
|
raise AttributeError(root.Error.text) #most likely session key invalid
|
|
else:
|
|
#update API Key ob Lookup object
|
|
self._apikey = apikey
|
|
|
|
else:
|
|
raise AttributeError(root.Error.text) #most likely session key missing
|
|
|
|
if root.Callsign is None:
|
|
raise ValueError
|
|
|
|
if root.Callsign.call:
|
|
lookup[const.CALLSIGN] = root.Callsign.call.text
|
|
if root.Callsign.xref:
|
|
lookup[const.XREF] = root.Callsign.xref.text
|
|
if root.Callsign.aliases:
|
|
lookup[const.ALIASES] = root.Callsign.aliases.text.split(',')
|
|
if root.Callsign.dxcc:
|
|
lookup[const.ADIF] = int(root.Callsign.dxcc.text)
|
|
if root.Callsign.fname:
|
|
lookup[const.FNAME] = root.Callsign.fname.text
|
|
if root.Callsign.find("name"):
|
|
lookup[const.NAME] = root.Callsign.find('name').get_text()
|
|
if root.Callsign.addr1:
|
|
lookup[const.ADDR1] = root.Callsign.addr1.text
|
|
if root.Callsign.addr2:
|
|
lookup[const.ADDR2] = root.Callsign.addr2.text
|
|
if root.Callsign.state:
|
|
lookup[const.STATE] = root.Callsign.state.text
|
|
if root.Callsign.zip:
|
|
lookup[const.ZIPCODE] = root.Callsign.zip.text
|
|
if root.Callsign.country:
|
|
lookup[const.COUNTRY] = root.Callsign.country.text
|
|
if root.Callsign.ccode:
|
|
lookup[const.CCODE] = int(root.Callsign.ccode.text)
|
|
if root.Callsign.lat:
|
|
lookup[const.LATITUDE] = float(root.Callsign.lat.text)
|
|
if root.Callsign.lon:
|
|
lookup[const.LONGITUDE] = float(root.Callsign.lon.text)
|
|
if root.Callsign.grid:
|
|
lookup[const.LOCATOR] = root.Callsign.grid.text
|
|
if root.Callsign.county:
|
|
lookup[const.COUNTY] = root.Callsign.county.text
|
|
if root.Callsign.fips:
|
|
lookup[const.FIPS] = int(root.Callsign.fips.text) # check type
|
|
if root.Callsign.land:
|
|
lookup[const.LAND] = root.Callsign.land.text
|
|
if root.Callsign.efdate:
|
|
try:
|
|
lookup[const.EFDATE] = datetime.strptime(root.Callsign.efdate.text, '%Y-%m-%d').replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
self._logger.debug("[QRZ.com] efdate: Invalid DateTime; " + callsign + " " + root.Callsign.efdate.text)
|
|
if root.Callsign.expdate:
|
|
try:
|
|
lookup[const.EXPDATE] = datetime.strptime(root.Callsign.expdate.text, '%Y-%m-%d').replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
self._logger.debug("[QRZ.com] expdate: Invalid DateTime; " + callsign + " " + root.Callsign.expdate.text)
|
|
if root.Callsign.p_call:
|
|
lookup[const.P_CALL] = root.Callsign.p_call.text
|
|
if root.Callsign.find('class'):
|
|
lookup[const.LICENSE_CLASS] = root.Callsign.find('class').get_text()
|
|
if root.Callsign.codes:
|
|
lookup[const.CODES] = root.Callsign.codes.text
|
|
if root.Callsign.qslmgr:
|
|
lookup[const.QSLMGR] = root.Callsign.qslmgr.text
|
|
if root.Callsign.email:
|
|
lookup[const.EMAIL] = root.Callsign.email.text
|
|
if root.Callsign.url:
|
|
lookup[const.URL] = root.Callsign.url.text
|
|
if root.Callsign.u_views:
|
|
lookup[const.U_VIEWS] = int(root.Callsign.u_views.text)
|
|
if root.Callsign.bio:
|
|
lookup[const.BIO] = root.Callsign.bio.text
|
|
if root.Callsign.biodate:
|
|
try:
|
|
lookup[const.BIODATE] = datetime.strptime(root.Callsign.biodate.text, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
self._logger.warning("[QRZ.com] biodate: Invalid DateTime; " + callsign)
|
|
if root.Callsign.image:
|
|
lookup[const.IMAGE] = root.Callsign.image.text
|
|
if root.Callsign.imageinfo:
|
|
lookup[const.IMAGE_INFO] = root.Callsign.imageinfo.text
|
|
if root.Callsign.serial:
|
|
lookup[const.SERIAL] = long(root.Callsign.serial.text)
|
|
if root.Callsign.moddate:
|
|
try:
|
|
lookup[const.MODDATE] = datetime.strptime(root.Callsign.moddate.text, '%Y-%m-%d %H:%M:%S').replace(tzinfo=timezone.utc)
|
|
except ValueError:
|
|
self._logger.warning("[QRZ.com] moddate: Invalid DateTime; " + callsign)
|
|
if root.Callsign.MSA:
|
|
lookup[const.MSA] = int(root.Callsign.MSA.text)
|
|
if root.Callsign.AreaCode:
|
|
lookup[const.AREACODE] = int(root.Callsign.AreaCode.text)
|
|
if root.Callsign.TimeZone:
|
|
lookup[const.TIMEZONE] = root.Callsign.TimeZone.text
|
|
if root.Callsign.GMTOffset:
|
|
lookup[const.GMTOFFSET] = float(root.Callsign.GMTOffset.text)
|
|
if root.Callsign.DST:
|
|
if root.Callsign.DST.text == "Y":
|
|
lookup[const.DST] = True
|
|
else:
|
|
lookup[const.DST] = False
|
|
if root.Callsign.eqsl:
|
|
if root.Callsign.eqsl.text == "1":
|
|
lookup[const.EQSL] = True
|
|
else:
|
|
lookup[const.EQSL] = False
|
|
if root.Callsign.mqsl:
|
|
if root.Callsign.mqsl.text == "1":
|
|
lookup[const.MQSL] = True
|
|
else:
|
|
lookup[const.MQSL] = False
|
|
if root.Callsign.cqzone:
|
|
lookup[const.CQZ] = int(root.Callsign.cqzone.text)
|
|
if root.Callsign.ituzone:
|
|
lookup[const.ITUZ] = int(root.Callsign.ituzone.text)
|
|
if root.Callsign.born:
|
|
lookup[const.BORN] = int(root.Callsign.born.text)
|
|
if root.Callsign.user:
|
|
lookup[const.USER_MGR] = root.Callsign.user.text
|
|
if root.Callsign.lotw:
|
|
if root.Callsign.lotw.text == "1":
|
|
lookup[const.LOTW] = True
|
|
else:
|
|
lookup[const.LOTW] = False
|
|
if root.Callsign.iota:
|
|
lookup[const.IOTA] = root.Callsign.iota.text
|
|
if root.Callsign.geoloc:
|
|
lookup[const.GEOLOC] = root.Callsign.geoloc.text
|
|
|
|
return lookup
|
|
|
|
def _load_clublogXML(self,
|
|
url="https://cdn.clublog.org/cty.php",
|
|
apikey=None,
|
|
cty_file=None):
|
|
""" Load and process the ClublogXML file either as a download or from file
|
|
"""
|
|
|
|
if self._download:
|
|
cty_file = self._download_file(
|
|
url = url,
|
|
apikey = apikey)
|
|
else:
|
|
cty_file = self._lib_filename
|
|
|
|
header = self._extract_clublog_header(cty_file)
|
|
cty_file = self._remove_clublog_xml_header(cty_file)
|
|
cty_dict = self._parse_clublog_xml(cty_file)
|
|
|
|
self._entities = cty_dict["entities"]
|
|
self._callsign_exceptions = cty_dict["call_exceptions"]
|
|
self._prefixes = cty_dict["prefixes"]
|
|
self._invalid_operations = cty_dict["invalid_operations"]
|
|
self._zone_exceptions = cty_dict["zone_exceptions"]
|
|
|
|
self._callsign_exceptions_index = cty_dict["call_exceptions_index"]
|
|
self._prefixes_index = cty_dict["prefixes_index"]
|
|
self._invalid_operations_index = cty_dict["invalid_operations_index"]
|
|
self._zone_exceptions_index = cty_dict["zone_exceptions_index"]
|
|
|
|
if self._download:
|
|
self._cleanup_download_artifact(cty_file)
|
|
|
|
return True
|
|
|
|
def _load_countryfile(self,
|
|
url="https://www.country-files.com/cty/cty.plist",
|
|
country_mapping_filename="countryfilemapping.json",
|
|
cty_file=None):
|
|
""" Load and process the ClublogXML file either as a download or from file
|
|
"""
|
|
|
|
cwdFile = os.path.abspath(os.path.join(os.getcwd(), country_mapping_filename))
|
|
pkgFile = os.path.abspath(os.path.join(os.path.dirname(__file__), country_mapping_filename))
|
|
|
|
# from cwd
|
|
if os.path.exists(cwdFile):
|
|
# country mapping files contains the ADIF identifiers of a particular
|
|
# country since the country-files do not provide this information (only DXCC id)
|
|
country_mapping_filename = cwdFile
|
|
# from package
|
|
elif os.path.exists(pkgFile):
|
|
country_mapping_filename = pkgFile
|
|
else:
|
|
country_mapping_filename = None
|
|
|
|
if self._download:
|
|
cty_file = self._download_file(url=url)
|
|
else:
|
|
cty_file = os.path.abspath(cty_file)
|
|
|
|
cty_dict = self._parse_country_file(cty_file, country_mapping_filename)
|
|
self._callsign_exceptions = cty_dict["exceptions"]
|
|
self._prefixes = cty_dict["prefixes"]
|
|
self._callsign_exceptions_index = cty_dict["exceptions_index"]
|
|
self._prefixes_index = cty_dict["prefixes_index"]
|
|
|
|
if self._download:
|
|
self._cleanup_download_artifact(cty_file)
|
|
|
|
return True
|
|
|
|
def _download_file(self, url, apikey=None):
|
|
""" Download lookup files either from Clublog or Country-files.com
|
|
"""
|
|
import gzip
|
|
import tempfile
|
|
|
|
cty = {}
|
|
cty_date = ""
|
|
cty_file_path = None
|
|
|
|
filename = None
|
|
|
|
# download file
|
|
if apikey: # clublog
|
|
response = requests.get(url+"?api="+apikey, timeout=10)
|
|
else: # country-files.com
|
|
response = requests.get(url, timeout=10)
|
|
|
|
if not self._check_html_response(response):
|
|
raise LookupError
|
|
|
|
#Clublog Webserver Header
|
|
if "Content-Disposition" in response.headers:
|
|
f = re.search('filename=".+"', response.headers["Content-Disposition"])
|
|
if f:
|
|
f = f.group(0)
|
|
filename = re.search('".+"', f).group(0).replace('"', '')
|
|
|
|
#Country-files.org webserver header
|
|
else:
|
|
f = re.search('/.{4}plist$', url)
|
|
if f:
|
|
f = f.group(0)
|
|
filename = f[1:]
|
|
|
|
if not filename:
|
|
filename = "cty_" + self._generate_random_word(5)
|
|
|
|
download_file_path = os.path.join(tempfile.gettempdir(), filename)
|
|
with open(download_file_path, "wb") as download_file:
|
|
download_file.write(response.content)
|
|
self._logger.debug(str(download_file_path) + " successfully downloaded")
|
|
|
|
# unzip file, if gz
|
|
if os.path.splitext(download_file_path)[1][1:] == "gz":
|
|
|
|
download_file = gzip.open(download_file_path, "r")
|
|
try:
|
|
cty_file_path = os.path.join(os.path.splitext(download_file_path)[0])
|
|
with open(cty_file_path, "wb") as cty_file:
|
|
cty_file.write(download_file.read())
|
|
self._logger.debug(str(cty_file_path) + " successfully extracted")
|
|
finally:
|
|
download_file.close()
|
|
else:
|
|
cty_file_path = download_file_path
|
|
|
|
return cty_file_path
|
|
|
|
def _cleanup_download_artifact(self, filename):
|
|
"""
|
|
Delete the downloaded files which are not necessary anymore
|
|
|
|
Args:
|
|
filename (string): absolute path to the download artifact
|
|
"""
|
|
|
|
try:
|
|
os.remove(filename)
|
|
except:
|
|
self._logger.warning("unable delete the download artifact: %s", _download_file)
|
|
|
|
def _extract_clublog_header(self, cty_xml_filename):
|
|
"""
|
|
Extract the header of the Clublog XML File
|
|
"""
|
|
|
|
cty_header = {}
|
|
|
|
try:
|
|
with open(cty_xml_filename, "r") as cty:
|
|
raw_header = cty.readline()
|
|
|
|
cty_date = re.search("date='.+'", raw_header)
|
|
if cty_date:
|
|
cty_date = cty_date.group(0).replace("date=", "").replace("'", "")
|
|
cty_date = datetime.strptime(cty_date[:19], '%Y-%m-%dT%H:%M:%S')
|
|
cty_date.replace(tzinfo=timezone.utc)
|
|
cty_header["Date"] = cty_date
|
|
|
|
cty_ns = re.search("xmlns='.+[']", raw_header)
|
|
if cty_ns:
|
|
cty_ns = cty_ns.group(0).replace("xmlns=", "").replace("'", "")
|
|
cty_header['NameSpace'] = cty_ns
|
|
|
|
if len(cty_header) == 2:
|
|
self._logger.debug("Header successfully retrieved from CTY File")
|
|
elif len(cty_header) < 2:
|
|
self._logger.warning("Header could only be partially retrieved from CTY File")
|
|
self._logger.warning("Content of Header: ")
|
|
for key in cty_header:
|
|
self._logger.warning(str(key)+": "+str(cty_header[key]))
|
|
return cty_header
|
|
|
|
except Exception as e:
|
|
self._logger.error("Clublog CTY File could not be opened / modified")
|
|
self._logger.error("Error Message: " + str(e))
|
|
return
|
|
|
|
|
|
def _remove_clublog_xml_header(self, cty_xml_filename):
|
|
"""
|
|
remove the header of the Clublog XML File to make it
|
|
properly parseable for the python ElementTree XML parser
|
|
"""
|
|
import tempfile
|
|
|
|
try:
|
|
with open(cty_xml_filename, "r") as f:
|
|
content = f.readlines()
|
|
|
|
cty_dir = tempfile.gettempdir()
|
|
cty_name = os.path.split(cty_xml_filename)[1]
|
|
cty_xml_filename_no_header = os.path.join(cty_dir, "NoHeader_"+cty_name)
|
|
|
|
with open(cty_xml_filename_no_header, "w") as f:
|
|
f.writelines("<clublog>\n\r")
|
|
f.writelines(content[1:])
|
|
|
|
self._logger.debug("Header successfully modified for XML Parsing")
|
|
return cty_xml_filename_no_header
|
|
|
|
except Exception as e:
|
|
self._logger.error("Clublog CTY could not be opened / modified")
|
|
self._logger.error("Error Message: " + str(e))
|
|
return
|
|
|
|
def _parse_clublog_xml(self, cty_xml_filename):
|
|
"""
|
|
parse the content of a clublog XML file and return the
|
|
parsed values in dictionaries
|
|
|
|
"""
|
|
|
|
entities = {}
|
|
call_exceptions = {}
|
|
prefixes = {}
|
|
invalid_operations = {}
|
|
zone_exceptions = {}
|
|
|
|
call_exceptions_index = {}
|
|
prefixes_index = {}
|
|
invalid_operations_index = {}
|
|
zone_exceptions_index = {}
|
|
|
|
cty_tree = ET.parse(cty_xml_filename)
|
|
root = cty_tree.getroot()
|
|
|
|
#retrieve ADIF Country Entities
|
|
cty_entities = cty_tree.find("entities")
|
|
self._logger.debug("total entities: " + str(len(cty_entities)))
|
|
if len(cty_entities) > 1:
|
|
for cty_entity in cty_entities:
|
|
try:
|
|
entity = {}
|
|
for item in cty_entity:
|
|
if item.tag == "name":
|
|
entity[const.COUNTRY] = str(item.text)
|
|
self._logger.debug(str(item.text))
|
|
elif item.tag == "prefix":
|
|
entity[const.PREFIX] = str(item.text)
|
|
elif item.tag == "deleted":
|
|
if item.text == "TRUE":
|
|
entity[const.DELETED] = True
|
|
else:
|
|
entity[const.DELETED] = False
|
|
elif item.tag == "cqz":
|
|
entity[const.CQZ] = int(item.text)
|
|
elif item.tag == "cont":
|
|
entity[const.CONTINENT] = str(item.text)
|
|
elif item.tag == "long":
|
|
entity[const.LONGITUDE] = float(item.text)
|
|
elif item.tag == "lat":
|
|
entity[const.LATITUDE] = float(item.text)
|
|
elif item.tag == "start":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
entity[const.START] = dt.replace(tzinfo=timezone.utc)
|
|
elif item.tag == "end":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
entity[const.END] = dt.replace(tzinfo=timezone.utc)
|
|
elif item.tag == "whitelist":
|
|
if item.text == "TRUE":
|
|
entity[const.WHITELIST] = True
|
|
else:
|
|
entity[const.WHITELIST] = False
|
|
elif item.tag == "whitelist_start":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
entity[const.WHITELIST_START] = dt.replace(tzinfo=timezone.utc)
|
|
elif item.tag == "whitelist_end":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
entity[const.WHITELIST_END] = dt.replace(tzinfo=timezone.utc)
|
|
except AttributeError:
|
|
self._logger.error("Error while processing: ")
|
|
entities[int(cty_entity[0].text)] = entity
|
|
self._logger.debug(str(len(entities))+" Entities added")
|
|
else:
|
|
raise Exception("No Country Entities detected in XML File")
|
|
|
|
|
|
cty_exceptions = cty_tree.find("exceptions")
|
|
if len(cty_exceptions) > 1:
|
|
for cty_exception in cty_exceptions:
|
|
call_exception = {}
|
|
for item in cty_exception:
|
|
if item.tag == "call":
|
|
call = str(item.text)
|
|
if call in call_exceptions_index.keys():
|
|
call_exceptions_index[call].append(int(cty_exception.attrib["record"]))
|
|
else:
|
|
call_exceptions_index[call] = [int(cty_exception.attrib["record"])]
|
|
elif item.tag == "entity":
|
|
call_exception[const.COUNTRY] = str(item.text)
|
|
elif item.tag == "adif":
|
|
call_exception[const.ADIF] = int(item.text)
|
|
elif item.tag == "cqz":
|
|
call_exception[const.CQZ] = int(item.text)
|
|
elif item.tag == "cont":
|
|
call_exception[const.CONTINENT] = str(item.text)
|
|
elif item.tag == "long":
|
|
call_exception[const.LONGITUDE] = float(item.text)
|
|
elif item.tag == "lat":
|
|
call_exception[const.LATITUDE] = float(item.text)
|
|
elif item.tag == "start":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
call_exception[const.START] = dt.replace(tzinfo=timezone.utc)
|
|
elif item.tag == "end":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
call_exception[const.END] = dt.replace(tzinfo=timezone.utc)
|
|
call_exceptions[int(cty_exception.attrib["record"])] = call_exception
|
|
|
|
self._logger.debug(str(len(call_exceptions))+" Exceptions added")
|
|
self._logger.debug(str(len(call_exceptions_index))+" unique Calls in Index ")
|
|
|
|
else:
|
|
raise Exception("No Exceptions detected in XML File")
|
|
|
|
|
|
cty_prefixes = cty_tree.find("prefixes")
|
|
if len(cty_prefixes) > 1:
|
|
for cty_prefix in cty_prefixes:
|
|
prefix = {}
|
|
for item in cty_prefix:
|
|
pref = None
|
|
if item.tag == "call":
|
|
|
|
#create index for this prefix
|
|
call = str(item.text)
|
|
if call in prefixes_index.keys():
|
|
prefixes_index[call].append(int(cty_prefix.attrib["record"]))
|
|
else:
|
|
prefixes_index[call] = [int(cty_prefix.attrib["record"])]
|
|
if item.tag == "entity":
|
|
prefix[const.COUNTRY] = str(item.text)
|
|
elif item.tag == "adif":
|
|
prefix[const.ADIF] = int(item.text)
|
|
elif item.tag == "cqz":
|
|
prefix[const.CQZ] = int(item.text)
|
|
elif item.tag == "cont":
|
|
prefix[const.CONTINENT] = str(item.text)
|
|
elif item.tag == "long":
|
|
prefix[const.LONGITUDE] = float(item.text)
|
|
elif item.tag == "lat":
|
|
prefix[const.LATITUDE] = float(item.text)
|
|
elif item.tag == "start":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
prefix[const.START] = dt.replace(tzinfo=timezone.utc)
|
|
elif item.tag == "end":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
prefix[const.END] = dt.replace(tzinfo=timezone.utc)
|
|
prefixes[int(cty_prefix.attrib["record"])] = prefix
|
|
|
|
self._logger.debug(str(len(prefixes))+" Prefixes added")
|
|
self._logger.debug(str(len(prefixes_index))+" unique Prefixes in Index")
|
|
else:
|
|
raise Exception("No Prefixes detected in XML File")
|
|
|
|
cty_inv_operations = cty_tree.find("invalid_operations")
|
|
if len(cty_inv_operations) > 1:
|
|
for cty_inv_operation in cty_inv_operations:
|
|
invalid_operation = {}
|
|
for item in cty_inv_operation:
|
|
call = None
|
|
if item.tag == "call":
|
|
call = str(item.text)
|
|
if call in invalid_operations_index.keys():
|
|
invalid_operations_index[call].append(int(cty_inv_operation.attrib["record"]))
|
|
else:
|
|
invalid_operations_index[call] = [int(cty_inv_operation.attrib["record"])]
|
|
|
|
elif item.tag == "start":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
invalid_operation[const.START] = dt.replace(tzinfo=timezone.utc)
|
|
elif item.tag == "end":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
invalid_operation[const.END] = dt.replace(tzinfo=timezone.utc)
|
|
invalid_operations[int(cty_inv_operation.attrib["record"])] = invalid_operation
|
|
|
|
self._logger.debug(str(len(invalid_operations))+" Invalid Operations added")
|
|
self._logger.debug(str(len(invalid_operations_index))+" unique Calls in Index")
|
|
else:
|
|
raise Exception("No records for invalid operations detected in XML File")
|
|
|
|
|
|
cty_zone_exceptions = cty_tree.find("zone_exceptions")
|
|
if len(cty_zone_exceptions) > 1:
|
|
for cty_zone_exception in cty_zone_exceptions:
|
|
zoneException = {}
|
|
for item in cty_zone_exception:
|
|
call = None
|
|
if item.tag == "call":
|
|
call = str(item.text)
|
|
if call in zone_exceptions_index.keys():
|
|
zone_exceptions_index[call].append(int(cty_zone_exception.attrib["record"]))
|
|
else:
|
|
zone_exceptions_index[call] = [int(cty_zone_exception.attrib["record"])]
|
|
|
|
elif item.tag == "zone":
|
|
zoneException[const.CQZ] = int(item.text)
|
|
elif item.tag == "start":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
zoneException[const.START] = dt.replace(tzinfo=timezone.utc)
|
|
elif item.tag == "end":
|
|
dt = datetime.strptime(item.text[:19], '%Y-%m-%dT%H:%M:%S')
|
|
zoneException[const.END] = dt.replace(tzinfo=timezone.utc)
|
|
zone_exceptions[int(cty_zone_exception.attrib["record"])] = zoneException
|
|
|
|
self._logger.debug(str(len(zone_exceptions))+" Zone Exceptions added")
|
|
self._logger.debug(str(len(zone_exceptions_index))+" unique Calls in Index")
|
|
else:
|
|
raise Exception("No records for zone exceptions detected in XML File")
|
|
|
|
result = {
|
|
"entities" : entities,
|
|
"call_exceptions" : call_exceptions,
|
|
"prefixes" : prefixes,
|
|
"invalid_operations" : invalid_operations,
|
|
"zone_exceptions" : zone_exceptions,
|
|
"prefixes_index" : prefixes_index,
|
|
"call_exceptions_index" : call_exceptions_index,
|
|
"invalid_operations_index" : invalid_operations_index,
|
|
"zone_exceptions_index" : zone_exceptions_index,
|
|
}
|
|
return result
|
|
|
|
def _parse_country_file(self, cty_file, country_mapping_filename=None):
|
|
"""
|
|
Parse the content of a PLIST file from country-files.com return the
|
|
parsed values in dictionaries.
|
|
Country-files.com provides Prefixes and Exceptions
|
|
|
|
"""
|
|
|
|
import plistlib
|
|
|
|
cty_list = None
|
|
entities = {}
|
|
exceptions = {}
|
|
prefixes = {}
|
|
|
|
exceptions_index = {}
|
|
prefixes_index = {}
|
|
|
|
exceptions_counter = 0
|
|
prefixes_counter = 0
|
|
|
|
mapping = None
|
|
|
|
with open(country_mapping_filename, "r") as f:
|
|
mapping = json.loads(f.read())
|
|
|
|
with open(cty_file, 'rb') as f:
|
|
try:
|
|
cty_list = plistlib.load(f) #New API (Python >=3.4)
|
|
except AttributeError:
|
|
cty_list = plistlib.readPlist(cty_file) #Old API (Python >=2.7 && <=3.8)
|
|
|
|
for item in cty_list:
|
|
entry = {}
|
|
call = str(item)
|
|
entry[const.COUNTRY] = str(cty_list[item]["Country"])
|
|
if mapping:
|
|
entry[const.ADIF] = int(mapping[cty_list[item]["Country"]])
|
|
entry[const.CQZ] = int(cty_list[item]["CQZone"])
|
|
entry[const.ITUZ] = int(cty_list[item]["ITUZone"])
|
|
entry[const.CONTINENT] = str(cty_list[item]["Continent"])
|
|
entry[const.LATITUDE] = float(cty_list[item]["Latitude"])
|
|
entry[const.LONGITUDE] = float(cty_list[item]["Longitude"])*(-1)
|
|
|
|
if cty_list[item]["ExactCallsign"]:
|
|
if call in exceptions_index.keys():
|
|
exceptions_index[call].append(exceptions_counter)
|
|
else:
|
|
exceptions_index[call] = [exceptions_counter]
|
|
exceptions[exceptions_counter] = entry
|
|
exceptions_counter += 1
|
|
else:
|
|
if call in prefixes_index.keys():
|
|
prefixes_index[call].append(prefixes_counter)
|
|
else:
|
|
prefixes_index[call] = [prefixes_counter]
|
|
prefixes[prefixes_counter] = entry
|
|
prefixes_counter += 1
|
|
|
|
self._logger.debug(str(len(prefixes))+" Prefixes added")
|
|
self._logger.debug(str(len(prefixes_index))+" Prefixes in Index")
|
|
self._logger.debug(str(len(exceptions))+" Exceptions added")
|
|
self._logger.debug(str(len(exceptions_index))+" Exceptions in Index")
|
|
|
|
result = {
|
|
"prefixes" : prefixes,
|
|
"exceptions" : exceptions,
|
|
"prefixes_index" : prefixes_index,
|
|
"exceptions_index" : exceptions_index,
|
|
}
|
|
|
|
return result
|
|
|
|
def _generate_random_word(self, length):
|
|
"""
|
|
Generates a random word
|
|
"""
|
|
return ''.join(random.choice(string.ascii_lowercase) for _ in range(length))
|
|
|
|
def _check_html_response(self, response):
|
|
"""
|
|
Checks if the API Key is valid and if the request returned a 200 status (ok)
|
|
"""
|
|
|
|
error1 = "Access to this form requires a valid API key. For more info see: http://www.clublog.org/need_api.php"
|
|
error2 = "Invalid or missing API Key"
|
|
|
|
if response.status_code == requests.codes.ok:
|
|
return True
|
|
else:
|
|
err_str = "HTTP Status Code: " + str(response.status_code) + " HTTP Response: " + str(response.text)
|
|
self._logger.error(err_str)
|
|
if response.status_code == 403:
|
|
raise APIKeyMissingError
|
|
else:
|
|
raise LookupError(err_str)
|
|
|
|
|
|
def _serialize_data(self, my_dict):
|
|
"""
|
|
Serialize a Dictionary into JSON
|
|
"""
|
|
new_dict = {}
|
|
for item in my_dict:
|
|
if isinstance(my_dict[item], datetime):
|
|
new_dict[item] = my_dict[item].strftime('%Y-%m-%d%H:%M:%S')
|
|
else:
|
|
new_dict[item] = str(my_dict[item])
|
|
|
|
return json.dumps(new_dict)
|
|
|
|
def _deserialize_data(self, json_data):
|
|
"""
|
|
Deserialize a JSON into a dictionary
|
|
"""
|
|
|
|
my_dict = json.loads(json_data.decode('utf8'))
|
|
|
|
for item in my_dict:
|
|
if item == const.ADIF:
|
|
my_dict[item] = int(my_dict[item])
|
|
elif item == const.DELETED:
|
|
my_dict[item] = self._str_to_bool(my_dict[item])
|
|
elif item == const.CQZ:
|
|
my_dict[item] = int(my_dict[item])
|
|
elif item == const.ITUZ:
|
|
my_dict[item] = int(my_dict[item])
|
|
elif item == const.LATITUDE:
|
|
my_dict[item] = float(my_dict[item])
|
|
elif item == const.LONGITUDE:
|
|
my_dict[item] = float(my_dict[item])
|
|
elif item == const.START:
|
|
my_dict[item] = datetime.strptime(my_dict[item], '%Y-%m-%d%H:%M:%S').replace(tzinfo=timezone.utc)
|
|
elif item == const.END:
|
|
my_dict[item] = datetime.strptime(my_dict[item], '%Y-%m-%d%H:%M:%S').replace(tzinfo=timezone.utc)
|
|
elif item == const.WHITELIST_START:
|
|
my_dict[item] = datetime.strptime(my_dict[item], '%Y-%m-%d%H:%M:%S').replace(tzinfo=timezone.utc)
|
|
elif item == const.WHITELIST_END:
|
|
my_dict[item] = datetime.strptime(my_dict[item], '%Y-%m-%d%H:%M:%S').replace(tzinfo=timezone.utc)
|
|
elif item == const.WHITELIST:
|
|
my_dict[item] = self._str_to_bool(my_dict[item])
|
|
else:
|
|
my_dict[item] = str(my_dict[item])
|
|
|
|
return my_dict
|
|
|
|
def _str_to_bool(self, input):
|
|
if input.lower() == "true":
|
|
return True
|
|
elif input.lower() == "false":
|
|
return False
|
|
else:
|
|
raise KeyError
|