add alarmMonitor for Rasbperry with RPiDisplay

This is an alarmMonitor for receive alarm-messages from BOSWatch and show them on a touchscreen
The jsonSocketServer controlls an Watterott RPi-Display in case of received POCSAG-RIC

Implemented functions:
- asynchronous threads for display control
- show ric-description and alarm-message on display
- different colours for no alarm, test alarm and alarm
- auto-turn-off display
- show POCSAG is alive status (coloured clock)
This commit is contained in:
JHCD 2015-07-15 00:59:54 +02:00
parent ce97760b00
commit 4e013ee55d
5 changed files with 580 additions and 0 deletions

View file

@ -0,0 +1,180 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
#
"""
alarmMonitor
This is an alarmMonitor for receive alarm-messages from BOSWatch and show them on a touchscreen
The jsonSocketServer controlls an Watterott RPi-Display in case of received POCSAG-RIC
Implemented functions:
- asynchronous threads for display control
- show ric-description and alarm-message on display
- different colours for no alarm, test alarm and alarm
- auto-turn-off display
- show POCSAG is alive status (coloured clock)
@author: Jens Herrmann
BOSWatch: https://github.com/Schrolli91/BOSWatch
RPi-Display: https://github.com/watterott/RPi-Display
"""
import logging
import logging.handlers
import ConfigParser
import os
import time
import socket # for socket
import json # for data
from threading import Thread
import pygame # for building colour-tuple
import globals
try:
#
# Logging
#
myLogger = logging.getLogger()
myLogger.setLevel(logging.DEBUG)
# set log string format
formatter = logging.Formatter('%(asctime)s - %(module)-24s [%(levelname)-8s] %(message)s', '%d.%m.%Y %H:%M:%S')
# create a display logger
ch = logging.StreamHandler()
# log level for display >= info
ch.setLevel(logging.INFO)
#ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
myLogger.addHandler(ch)
#
# Read config.ini
#
try:
logging.debug("reading config file")
globals.config = ConfigParser.SafeConfigParser()
globals.config.read("config.ini")
# if given loglevel is debug:
for key,val in globals.config.items("AlarmMonitor"):
logging.debug(" -- %s = %s", key, val)
except:
# we couldn't work without config -> exit
logging.critical("cannot read config file")
logging.debug("cannot read config file", exc_info=True)
exit(1)
#
# set environment for display and touchscreen
#
os.environ["SDL_FBDEV"] = "/dev/fb1"
os.environ["SDL_MOUSEDEV"] = "/dev/input/touchscreen"
os.environ["SDL_MOUSEDRV"] = "TSLIB"
#
# start threads
#
try:
from displayService import displayPainter, autoTurnOffDisplay, eventHandler
globals.screenBackground = pygame.Color(globals.config.get("AlarmMonitor","colourGreen"))
logging.debug("Start displayPainter-thread")
Thread(target=displayPainter).start()
logging.debug("Start autoTurnOffDisplay-thread")
Thread(target=autoTurnOffDisplay).start()
logging.debug("Start eventHandler-thread")
Thread(target=eventHandler).start()
except:
# we couldn't work without config -> exit
logging.critical("cannot start displayService-Threads")
logging.debug("cannot start displayService-Threads", exc_info=True)
exit(1)
#
# start socket
#
logging.debug("Start socketServer")
sock = socket.socket () # TCP
sock.bind(("",globals.config.getint("AlarmMonitor","socketPort")))
sock.listen(5)
logging.info("socketServer runs")
#
# Build Lists out of config-entries
#
keepAliveRICs = globals.config.get("AlarmMonitor","keepAliveRICs").split()
alarmRICs = globals.config.get("AlarmMonitor","alarmRICs").split()
functionCharTestAlarm = globals.config.get("AlarmMonitor","functionCharTestAlarm").split()
functionCharAlarm = globals.config.get("AlarmMonitor","functionCharAlarm").split()
#
# Main Program
# (Threads will set abort to True if an error occurs)
#
while globals.abort == False:
# accept connections from outside
(clientsocket, address) = sock.accept()
logging.debug("connected client: %s", address)
# recv message as json_string
json_string = clientsocket.recv( 4096 ) # buffer size is 1024 bytes
try:
# parsing jason
parsed_json = json.loads(json_string)
logging.debug("parsed message: %s", parsed_json)
except ValueError:
# we will ignore waste in json_string
logging.warning("No JSON object could be decoded: %s", json_string)
pass
else:
try:
# keep alive calculation with additional RICs
if parsed_json['ric'] in keepAliveRICs:
logging.info("POCSAG is alive")
globals.lastAlarm = int(time.time())
# (test) alarm processing
elif parsed_json['ric'] in alarmRICs:
logging.debug("We have do to something")
if parsed_json['functionChar'] in functionCharTestAlarm:
logging.info("-> Probealarm: %s", parsed_json['ric'])
globals.screenBackground = pygame.Color(globals.config.get("AlarmMonitor","colourYellow"))
elif parsed_json['functionChar'] in functionCharAlarm:
logging.info("-> Alarm: %s", parsed_json['ric'])
globals.screenBackground = pygame.Color(globals.config.get("AlarmMonitor","colourRed"))
# forward data to alarmMonitor
globals.data = parsed_json
# update lastAlarm for keep alive calculation
globals.lastAlarm = int(time.time())
# enable display for n seconds:
globals.enableDisplayUntil = int(time.time()) + globals.config.getint("AlarmMonitor","showAlarmTime")
# tell alarm-thread to turn on the display
globals.showDisplay = True;
except KeyError:
# we will ignore waste in json_string
logging.warning("No RIC found: %s", json_string)
pass
except KeyboardInterrupt:
logging.warning("Keyboard Interrupt")
except:
logging.exception("unknown error")
finally:
try:
logging.info("socketServer shuting down")
globals.running = False
sock.close()
time.sleep(0.5)
logging.debug("socket closed")
logging.debug("exiting socketServer")
except:
logging.warning("failed in clean-up routine")
finally:
# Close Logging
logging.debug("close Logging")
logging.info("socketServer exit()")
logging.shutdown()
ch.close()

View file

@ -0,0 +1,48 @@
############################
# AlarmMonitor Config File #
############################
[AlarmMonitor]
# listen port for socket server
socketPort = 8112
# process alarms for the following RICs
alarmRICs = 1234567, 12345678
# use the following RICs for keep alive calculation (additonal)
keepAliveRICs = 1000000
# use the following functionChar for test alarms (if empty no test alarms will shown)
functionCharTestAlarm = a
# use the following functionChar for alarms (if empty no alarms will shown)
# if functionChar is used in functionCharTestAlarm too, it will bee ignored for alarms
functionCharAlarm = b, c, d
# Show alarm massage for n seconds
showAlarmTime = 180
# Show display for n seconds by touching
showDisplayTime = 30
# colouring status of RIC-decoding (n seconds after last alarm)
delayForYellow = 240
delayForRed = 360
# colours for alarmMonitor
colourBlack = #000000
colourRed = #B22222
colourGreen = #008B00
colourBlue = #00008B
colourYellow = #8B8B00
colourGrey = #BEBEBE
colourDimGrey = #696969
colourWhite = #FFFFFF
[Display]
# Pin of LCD backlight (script will use only on/off)
GPIOPinForBacklight = 18
# display size
displayWidth = 320
displayHeight = 240

View file

@ -0,0 +1,279 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
#
"""
alarmMonitor - displayServices
@author: Jens Herrmann
"""
#
# Only works as an asynchronous thread
# will call "exit(0)" when function is finished
#
def autoTurnOffDisplay():
"""
Asynchronous function to turn of the display backlight
@requires: globals.showDisplay - status of backlight
@requires: globals.enableDisplayUntil - given timestamp to turn off backlight
@requires: globals.running - service runs as long as this is True
In case of an exception the function set globals.abort to True.
This will terminate the main program.
@return: nothing
@exception: SystemExit exception in case of an error
"""
import sys
import time
import logging
import ConfigParser
import globals
logging.debug("autoTurnOffDisplay-thread started")
try:
# Running will be set to False if main program is shutting down
while globals.running == True:
# check if timestamp is in the past
if (globals.showDisplay == True) and (globals.enableDisplayUntil < int(time.time())):
globals.showDisplay = False
logging.info("display turned off")
# we will do this only one time per second
time.sleep(1)
except:
logging.error("unknown error in autoTurnOffDisplay-thread")
logging.debug("unknown error in autoTurnOffDisplay-thread", exc_info=True)
# abort main program
globals.abort = True
sys.exit(1)
finally:
logging.debug("exit autoTurnOffDisplay-thread")
exit(0)
#
# Only works as an asynchronous thread
# will call "exit(0)" when function is finished
#
def eventHandler():
"""
Asynchronous function to handle pygames events
in particular the touchscreen events
@requires: globals.showDisplay - status of backlight
@requires: globals.enableDisplayUntil - timestamp to turn off backlight
@requires: globals.running - service runs as long as this is True
@requires: configuration has to be set in the config.ini
In case of an exception the function set globals.abort to True.
This will terminate the main program.
@return: nothing
@exception: SystemExit exception in case of an error
"""
import sys
import time
import logging
import ConfigParser
import pygame
import globals
logging.debug("eventHandler-thread called")
try:
clock = pygame.time.Clock()
# Running will be set to False if main program is shutting down
while globals.running == True:
# This limits the while loop to a max of 2 times per second.
# Leave this out and we will use all CPU we can.
clock.tick(2)
# current time for this loop:
curtime = int(time.time())
for event in pygame.event.get():
# event-handler for QUIT
if event.type == pygame.QUIT:
globals.running = False
# if touchscreen pressed
if event.type == pygame.MOUSEBUTTONDOWN:
if globals.showDisplay:
logging.info("turn OFF display")
globals.showDisplay = False
else:
logging.info("turn ON display")
globals.enableDisplayUntil = curtime + globals.config.getint("AlarmMonitor","showDisplayTime")
globals.showDisplay = True
except:
logging.error("unknown error in eventHandler-thread")
logging.debug("unknown error in eventHandler-thread", exc_info=True)
# abort main program
globals.abort = True
sys.exit(1)
finally:
logging.debug("exit eventHandler-thread")
exit(0)
#
# Only works as an asynchronous thread
# will call "exit(0)" when function is finished
#
def displayPainter():
"""
Asynchronous function to build the display content
@requires: globals.showDisplay - status of backlight
@requires: globals.enableDisplayUntil - given timestamp when backlight will turned off
@requires: globals.running - service runs as long as this is True
@requires: globals.data - data of the last alarm
@requires: globals.lastAlarm - timestamp of the last processing (see alarmRICs and keepAliveRICs)
@requires: configuration has to be set in the config.ini
In case of an exception the function set globals.abort to True.
This will terminate the main program.
@return: nothing
@exception: SystemExit exception in case of an error
"""
import sys
import time
import logging
import ConfigParser
import RPi.GPIO as GPIO
import pygame
from wrapline import wrapline
import globals
logging.debug("displayPainter-thread called")
try:
# use GPIO pin numbering convention
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
# set up GPIO pin for output
GPIO.setup(globals.config.getint("Display","GPIOPinForBacklight"), GPIO.OUT)
pygame.init()
#screen size
size = (globals.config.getint("Display","displayWidth"), globals.config.getint("Display","displayHeight"))
screen = pygame.display.set_mode(size)
# disable mouse cursor
pygame.mouse.set_visible(False)
#define fonts
fontHeader = pygame.font.Font(None, 30)
fontHeader.set_bold(True)
fontHeader.set_underline(True)
fontRIC = pygame.font.Font(None, 30)
fontRIC.set_bold(True)
fontMsg = pygame.font.Font(None, 20)
fontTime = pygame.font.Font(None, 15)
clock = pygame.time.Clock()
logging.debug("displayPainter-thread started")
# Running will be set to False if main program is shutting down
while globals.running == True:
# This limits the while loop to a max of 2 times per second.
# Leave this out and we will use all CPU we can.
clock.tick(2)
# current time for this loop:
curtime = int(time.time())
if globals.showDisplay:
# Enable LCD display
GPIO.output(globals.config.getint("Display","GPIOPinForBacklight"), GPIO.HIGH)
# Clear the screen and set the screen background
screen.fill(globals.screenBackground)
widthX = globals.config.getint("Display","displayWidth") - 20
widthY = globals.config.getint("Display","displayHeight") - 20
pygame.draw.rect(screen, pygame.Color(globals.config.get("AlarmMonitor","colourBlack")), (10, 10, widthX, widthY))
# header
header = fontHeader.render("Alarm-Monitor", 1, pygame.Color(globals.config.get("AlarmMonitor","colourRed")))
(width, height) = fontHeader.size("Alarm-Monitor")
x = (int(globals.config.getint("Display","displayWidth")) - width)/2
screen.blit(header, (x, 20))
# Alarm - RIC:
try:
y = 50
textLines = wrapline(globals.data['description'], fontRIC, (globals.config.getint("Display","displayWidth") - 40))
for index, item in enumerate(textLines):
textZeile = fontRIC.render(item, 1, pygame.Color(globals.config.get("AlarmMonitor","colourWhite")))
screen.blit(textZeile, (20, y))
y += 25
except KeyError:
pass
# Alarm - Text
try:
y += 10
textLines = wrapline(globals.data['msg'].replace("*", " * "), fontMsg, (globals.config.getint("Display","displayWidth") - 40))
for index, item in enumerate(textLines):
textZeile = fontMsg.render(item, 1, pygame.Color(globals.config.get("AlarmMonitor","colourGrey")))
screen.blit(textZeile, (20, y))
y += 20
except KeyError:
pass
# show time of last alarm
if globals.lastAlarm > 0:
try:
# format last alarm
lastAlarmString = time.strftime("%H:%M:%S", time.localtime(globals.lastAlarm))
# Color time:
# red: lastAlarm more than n (delayForRed) seconds past
if (int(globals.lastAlarm) + globals.config.getint("AlarmMonitor","delayForRed")) < curtime:
timeColour = pygame.Color(globals.config.get("AlarmMonitor","colourRed"))
# yellow: lastAlarm more than n (delayForYellow) seconds past
elif (int(globals.lastAlarm) + globals.config.getint("AlarmMonitor","delayForYellow")) < curtime:
timeColour = pygame.Color(globals.config.get("AlarmMonitor","colourYellow"))
# dgrey: normal
else:
timeColour = pygame.Color(globals.config.get("AlarmMonitor","colourGreen"))
lastAlarm = fontTime.render(lastAlarmString, 1, timeColour)
(width, height) = fontTime.size(lastAlarmString)
x = globals.config.getint("Display","displayWidth") - 20 - width
screen.blit(lastAlarm, (x, 20))
except:
logging.debug("unknown error in lastAlarm", exc_info=True)
pass
# show remaining time before display will be turned off:
restZeit = globals.enableDisplayUntil - curtime +1
zeit = fontTime.render(str(restZeit), 1, pygame.Color(globals.config.get("AlarmMonitor","colourDimGrey")))
screen.blit(zeit, (20, 20))
else:
GPIO.output(globals.config.getint("Display","GPIOPinForBacklight"), GPIO.LOW)
# Update display...
pygame.display.update()
except:
logging.error("unknown error in displayPainter-thread")
logging.debug("unknown error in displayPainter-thread", exc_info=True)
# abort main program
globals.abort = True
sys.exit(1)
finally:
logging.debug("exit displayPainter-thread")
GPIO.output(globals.config.getint("Display","GPIOPinForBacklight"), GPIO.LOW)
GPIO.cleanup()
pygame.quit()
exit(0)

View file

@ -0,0 +1,23 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
#
# control-params (Boolean)
running = True
showDisplay = False
abort = False
# color of display-boarder
screenBackground = ""
# data-structure (Dict)
data = {}
# last alarm shown (Timestamp)
lastAlarm = 0
# enable display until (Timestamp)
enableDisplayUntil = 0
# configparser (Configparser)
config = 0

View file

@ -0,0 +1,50 @@
#!/usr/bin/python
# -*- coding: UTF-8 -*-
#
"""
alarmMonitor - wrapline
This snippet of code will convert a string of text into a list containing the lines it would break down into for a certain font and width
@author: pygame.org
http://www.pygame.org/wiki/TextWrapping
"""
def truncline(text, font, maxwidth):
real=len(text)
stext=text
l=font.size(text)[0]
cut=0
a=0
done=1
old = None
while l > maxwidth:
a=a+1
n=text.rsplit(None, a)[0]
if stext == n:
cut += 1
stext= n[:-cut]
else:
stext = n
l=font.size(stext)[0]
real=len(stext)
done=0
return real, done, stext
def wrapline(text, font, maxwidth):
done=0
wrapped=[]
while not done:
nl, done, stext=truncline(text, font, maxwidth)
wrapped.append(stext.strip())
text=text[nl:]
return wrapped
def wrap_multi_line(text, font, maxwidth):
""" returns text taking new lines into account.
"""
lines = chain(*(wrapline(line, font, maxwidth) for line in text.splitlines()))
return list(lines)