Add realtime status line

This commit is contained in:
Nonoo 2020-10-28 10:15:13 +01:00
parent 826576ba75
commit d9d1c9522b
10 changed files with 170 additions and 27 deletions

2
.vscode/launch.json vendored
View file

@ -10,7 +10,7 @@
"request": "launch", "request": "launch",
"mode": "auto", "mode": "auto",
"program": "${workspaceFolder}", "program": "${workspaceFolder}",
"args": ["-v"] "args": ["-v", "-i", "1000"]
} }
] ]
} }

View file

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
"time"
"github.com/pborman/getopt" "github.com/pborman/getopt"
) )
@ -11,6 +12,7 @@ var verboseLog bool
var connectAddress string var connectAddress string
var serialTCPPort uint16 var serialTCPPort uint16
var enableSerialDevice bool var enableSerialDevice bool
var statusLogInterval time.Duration
func parseArgs() { func parseArgs() {
h := getopt.BoolLong("help", 'h', "display help") h := getopt.BoolLong("help", 'h', "display help")
@ -18,6 +20,7 @@ func parseArgs() {
a := getopt.StringLong("address", 'a', "IC-705", "Connect to address") a := getopt.StringLong("address", 'a', "IC-705", "Connect to address")
t := getopt.Uint16Long("serial-tcp-port", 'p', 4533, "Expose radio's serial port on this TCP port") t := getopt.Uint16Long("serial-tcp-port", 'p', 4533, "Expose radio's serial port on this TCP port")
s := getopt.BoolLong("enable-serial-device", 's', "Expose radio's serial port as a virtual serial port") s := getopt.BoolLong("enable-serial-device", 's', "Expose radio's serial port as a virtual serial port")
i := getopt.Uint16Long("log-interval", 'i', 100, "Status log interval in milliseconds")
getopt.Parse() getopt.Parse()
@ -31,4 +34,5 @@ func parseArgs() {
connectAddress = *a connectAddress = *a
serialTCPPort = *t serialTCPPort = *t
enableSerialDevice = *s enableSerialDevice = *s
statusLogInterval = time.Duration(*i) * time.Millisecond
} }

View file

@ -11,8 +11,12 @@ type bandwidthStruct struct {
toRadioPkts int toRadioPkts int
fromRadioBytes int fromRadioBytes int
fromRadioPkts int fromRadioPkts int
lostPkts int
lastGet time.Time lastGet time.Time
lostPkts int
lastLostReset time.Time
retransmits int
lastRetransmitReset time.Time
} }
var bandwidth bandwidthStruct var bandwidth bandwidthStruct
@ -40,29 +44,56 @@ func (b *bandwidthStruct) add(toRadioBytes, fromRadioBytes int) {
} }
} }
// Call this function when a packet is sent or received.
func (b *bandwidthStruct) reportLoss(pkts int) { func (b *bandwidthStruct) reportLoss(pkts int) {
bandwidthMutex.Lock() bandwidthMutex.Lock()
defer bandwidthMutex.Unlock() defer bandwidthMutex.Unlock()
if b.lostPkts == 0 {
b.lastLostReset = time.Now()
}
b.lostPkts += pkts b.lostPkts += pkts
} }
func (b *bandwidthStruct) get() (toRadioBytesPerSec, fromRadioBytesPerSec int, lossPercent float64) { func (b *bandwidthStruct) reportRetransmit(pkts int) {
bandwidthMutex.Lock()
defer bandwidthMutex.Unlock()
if b.retransmits == 0 {
b.lastRetransmitReset = time.Now()
}
b.retransmits += pkts
}
func (b *bandwidthStruct) get() (toRadioBytesPerSec, fromRadioBytesPerSec int, lost int, retransmits int) {
bandwidthMutex.Lock() bandwidthMutex.Lock()
defer bandwidthMutex.Unlock() defer bandwidthMutex.Unlock()
secs := time.Since(b.lastGet).Seconds() secs := time.Since(b.lastGet).Seconds()
toRadioBytesPerSec = int(float64(b.toRadioBytes) / secs) toRadioBytesPerSec = int(float64(b.toRadioBytes) / secs)
fromRadioBytesPerSec = int(float64(b.fromRadioBytes) / secs) fromRadioBytesPerSec = int(float64(b.fromRadioBytes) / secs)
lossPercent = (float64(b.lostPkts) / float64(b.lostPkts+b.fromRadioPkts)) * 100
b.toRadioBytes = 0 b.toRadioBytes = 0
b.toRadioPkts = 0 b.toRadioPkts = 0
b.fromRadioBytes = 0 b.fromRadioBytes = 0
b.fromRadioPkts = 0 b.fromRadioPkts = 0
b.lostPkts = 0
b.lastGet = time.Now() b.lastGet = time.Now()
secs = time.Since(b.lastLostReset).Seconds()
lost = b.lostPkts
// Only resetting error reports in a longer timeframe.
if secs >= 60 {
b.lostPkts = 0
b.lastLostReset = time.Now()
}
secs = time.Since(b.lastRetransmitReset).Seconds()
retransmits = b.retransmits
// Only resetting error reports in a longer timeframe.
if secs >= 60 {
b.retransmits = 0
b.lastRetransmitReset = time.Now()
}
return return
} }

View file

@ -4,7 +4,6 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt"
"time" "time"
) )
@ -12,8 +11,6 @@ const controlStreamPort = 50001
const serialStreamPort = 50002 const serialStreamPort = 50002
const audioStreamPort = 50003 const audioStreamPort = 50003
const statusLogInterval = 3 * time.Second
type controlStream struct { type controlStream struct {
common streamCommon common streamCommon
serial serialStream serial serialStream
@ -218,13 +215,13 @@ func (s *controlStream) handleRead(r []byte) error {
} }
s.serialAndAudioStreamOpened = true s.serialAndAudioStreamOpened = true
statusLog.startPeriodicPrint()
} }
} }
return nil return nil
} }
func (s *controlStream) loop() { func (s *controlStream) loop() {
startTime := time.Now()
bandwidth.reset() bandwidth.reset()
s.secondAuthTimer = time.NewTimer(200 * time.Millisecond) s.secondAuthTimer = time.NewTimer(200 * time.Millisecond)
@ -232,7 +229,6 @@ func (s *controlStream) loop() {
<-s.reauthTimeoutTimer.C <-s.reauthTimeoutTimer.C
reauthTicker := time.NewTicker(25 * time.Second) reauthTicker := time.NewTicker(25 * time.Second)
statusLogTicker := time.NewTicker(statusLogInterval)
for { for {
select { select {
@ -255,14 +251,6 @@ func (s *controlStream) loop() {
} }
case <-s.reauthTimeoutTimer.C: case <-s.reauthTimeoutTimer.C:
log.Error("auth timeout, audio/serial stream may stop") log.Error("auth timeout, audio/serial stream may stop")
case <-statusLogTicker.C:
if s.serialAndAudioStreamOpened {
up, down, loss := bandwidth.get()
log.Print("running for ", time.Since(startTime).Round(time.Second),
" rtt ", s.common.pkt7.latency.Milliseconds(), "ms up ",
bandwidth.formatByteCount(up), "/s down ",
bandwidth.formatByteCount(down), "/s loss ", fmt.Sprintf("%.2f", loss), "%")
}
case <-s.deinitNeededChan: case <-s.deinitNeededChan:
s.deinitFinishedChan <- true s.deinitFinishedChan <- true
return return
@ -330,6 +318,7 @@ func (s *controlStream) init() error {
func (s *controlStream) deinit() { func (s *controlStream) deinit() {
s.deinitializing = true s.deinitializing = true
s.serialAndAudioStreamOpened = false s.serialAndAudioStreamOpened = false
statusLog.stopPeriodicPrint()
if s.deinitNeededChan != nil { if s.deinitNeededChan != nil {
s.deinitNeededChan <- true s.deinitNeededChan <- true

31
log.go
View file

@ -27,39 +27,70 @@ func (l *logger) GetCallerFileName(withLine bool) string {
} }
} }
func (l *logger) printLineClear() {
fmt.Printf("%c[2K", 27)
}
func (l *logger) Printf(a string, b ...interface{}) { func (l *logger) Printf(a string, b ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Infof(l.GetCallerFileName(false)+": "+a, b...) l.logger.Infof(l.GetCallerFileName(false)+": "+a, b...)
} }
func (l *logger) Print(a ...interface{}) { func (l *logger) Print(a ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Info(append([]interface{}{l.GetCallerFileName(false) + ": "}, a...)...) l.logger.Info(append([]interface{}{l.GetCallerFileName(false) + ": "}, a...)...)
} }
func (l *logger) Debugf(a string, b ...interface{}) { func (l *logger) Debugf(a string, b ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Debugf(l.GetCallerFileName(true)+": "+a, b...) l.logger.Debugf(l.GetCallerFileName(true)+": "+a, b...)
} }
func (l *logger) Debug(a ...interface{}) { func (l *logger) Debug(a ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Debug(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) l.logger.Debug(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...)
} }
func (l *logger) Errorf(a string, b ...interface{}) { func (l *logger) Errorf(a string, b ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Errorf(l.GetCallerFileName(true)+": "+a, b...) l.logger.Errorf(l.GetCallerFileName(true)+": "+a, b...)
} }
func (l *logger) Error(a ...interface{}) { func (l *logger) Error(a ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Error(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) l.logger.Error(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...)
} }
func (l *logger) ErrorC(a ...interface{}) { func (l *logger) ErrorC(a ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Error(a...) l.logger.Error(a...)
} }
func (l *logger) Fatalf(a string, b ...interface{}) { func (l *logger) Fatalf(a string, b ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Fatalf(l.GetCallerFileName(true)+": "+a, b...) l.logger.Fatalf(l.GetCallerFileName(true)+": "+a, b...)
} }
func (l *logger) Fatal(a ...interface{}) { func (l *logger) Fatal(a ...interface{}) {
if ctrl != nil {
l.printLineClear()
}
l.logger.Fatal(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) l.logger.Fatal(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...)
} }

15
main.go
View file

@ -10,6 +10,7 @@ import (
) )
var gotErrChan = make(chan bool) var gotErrChan = make(chan bool)
var ctrl *controlStream
func getAboutStr() string { func getAboutStr() string {
var v string var v string
@ -23,6 +24,10 @@ func getAboutStr() string {
} }
func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) { func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) {
defer func() {
ctrl = nil
}()
// Depleting gotErrChan. // Depleting gotErrChan.
var finished bool var finished bool
for !finished { for !finished {
@ -33,17 +38,17 @@ func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) {
} }
} }
c := controlStream{} ctrl = &controlStream{}
if err := c.init(); err != nil { if err := ctrl.init(); err != nil {
log.Error(err) log.Error(err)
c.deinit() ctrl.deinit()
return false, 0 return false, 0
} }
select { select {
case <-gotErrChan: case <-gotErrChan:
c.deinit() ctrl.deinit()
// Need to wait before reinit because the IC-705 will disconnect our audio stream eventually if we relogin // Need to wait before reinit because the IC-705 will disconnect our audio stream eventually if we relogin
// in a too short interval without a deauth... // in a too short interval without a deauth...
@ -59,7 +64,7 @@ func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) {
return return
case <-osSignal: case <-osSignal:
log.Print("sigterm received") log.Print("sigterm received")
c.deinit() ctrl.deinit()
return true, 0 return true, 0
} }
} }

View file

@ -36,6 +36,7 @@ func (p *pkt0Type) retransmitRange(s *streamCommon, start, end uint16) error {
log.Debug("got retransmit request for #", start, "-", end) log.Debug("got retransmit request for #", start, "-", end)
for { for {
bandwidth.reportRetransmit(1)
d := p.txSeqBuf.get(seqNum(start)) d := p.txSeqBuf.get(seqNum(start))
if d != nil { if d != nil {
log.Debug(s.name+"/retransmitting #", start) log.Debug(s.name+"/retransmitting #", start)
@ -69,6 +70,7 @@ func (p *pkt0Type) handle(s *streamCommon, r []byte) error {
d := p.txSeqBuf.get(seqNum(seq)) d := p.txSeqBuf.get(seqNum(seq))
if d != nil { if d != nil {
log.Debug(s.name+"/retransmitting #", seq) log.Debug(s.name+"/retransmitting #", seq)
bandwidth.reportRetransmit(1)
if err := s.send(d); err != nil { if err := s.send(d); err != nil {
return err return err
} }

View file

@ -46,9 +46,12 @@ func (p *pkt7Type) handle(s *streamCommon, r []byte) error {
p.timeoutTimer.Reset(pkt7TimeoutDuration) p.timeoutTimer.Reset(pkt7TimeoutDuration)
} }
// Only measure latency after the timeout has been initialized, so the auth is already done. if s.name == "control" { // Only measure latency on the control stream.
p.latency += time.Since(p.lastSendAt) // Only measure latency after the timeout has been initialized, so the auth is already done.
p.latency /= 2 p.latency += time.Since(p.lastSendAt)
p.latency /= 2
statusLog.reportRTTLatency(p.latency)
}
} }
// expectedSeq := p.lastConfirmedSeq + 1 // expectedSeq := p.lastConfirmedSeq + 1

76
status.go Normal file
View file

@ -0,0 +1,76 @@
package main
import (
"fmt"
"sync"
"time"
)
type statusLogStruct struct {
ticker *time.Ticker
stopChan chan bool
stopFinishedChan chan bool
mutex sync.Mutex
startTime time.Time
rttLatency time.Duration
}
var statusLog statusLogStruct
func (s *statusLogStruct) reportRTTLatency(l time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.rttLatency = l
}
func (s *statusLogStruct) print() {
s.mutex.Lock()
defer s.mutex.Unlock()
up, down, lost, retransmits := bandwidth.get()
l := fmt.Sprint("up ", time.Since(s.startTime).Round(time.Second),
" rtt ", s.rttLatency.Milliseconds(), "ms up ",
bandwidth.formatByteCount(up), "/s down ",
bandwidth.formatByteCount(down), "/s retx ", retransmits, " /1m lost ", lost, " /1m")
if statusLogInterval < time.Second {
log.printLineClear()
fmt.Print(time.Now().Format("2006-01-02T15:04:05.000Z0700"), " ", l, "\r")
} else {
log.Print(l)
}
}
func (s *statusLogStruct) loop() {
for {
select {
case <-s.ticker.C:
s.print()
case <-s.stopChan:
s.stopFinishedChan <- true
return
}
}
}
func (s *statusLogStruct) startPeriodicPrint() {
s.startTime = time.Now()
s.stopChan = make(chan bool)
s.stopFinishedChan = make(chan bool)
s.ticker = time.NewTicker(statusLogInterval)
go s.loop()
}
func (s *statusLogStruct) stopPeriodicPrint() {
if s.ticker == nil { // Already stopped?
return
}
s.ticker.Stop()
s.ticker = nil
s.stopChan <- true
<-s.stopFinishedChan
}

View file

@ -192,11 +192,13 @@ func (s *streamCommon) requestRetransmitIfNeeded(gotSeq uint16) error {
} }
if missingPkts == 1 { if missingPkts == 1 {
log.Debug(s.name+"/requesting pkt #", sr[1], " retransmit") log.Debug(s.name+"/requesting pkt #", sr[1], " retransmit")
bandwidth.reportRetransmit(missingPkts)
if err := s.sendRetransmitRequest(sr[1]); err != nil { if err := s.sendRetransmitRequest(sr[1]); err != nil {
return err return err
} }
} else if missingPkts < 50 { } else if missingPkts < 50 {
log.Debug(s.name+"/requesting pkt #", sr[0], "-#", sr[1], " retransmit") log.Debug(s.name+"/requesting pkt #", sr[0], "-#", sr[1], " retransmit")
bandwidth.reportRetransmit(missingPkts)
if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil { if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil {
return err return err
} }