diff --git a/.vscode/launch.json b/.vscode/launch.json index ec0a956..74ab9ee 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -10,7 +10,7 @@ "request": "launch", "mode": "auto", "program": "${workspaceFolder}", - "args": ["-v"] + "args": ["-v", "-i", "1000"] } ] } \ No newline at end of file diff --git a/args.go b/args.go index 4535901..cb0fa19 100644 --- a/args.go +++ b/args.go @@ -3,6 +3,7 @@ package main import ( "fmt" "os" + "time" "github.com/pborman/getopt" ) @@ -11,6 +12,7 @@ var verboseLog bool var connectAddress string var serialTCPPort uint16 var enableSerialDevice bool +var statusLogInterval time.Duration func parseArgs() { h := getopt.BoolLong("help", 'h', "display help") @@ -18,6 +20,7 @@ func parseArgs() { a := getopt.StringLong("address", 'a', "IC-705", "Connect to address") t := getopt.Uint16Long("serial-tcp-port", 'p', 4533, "Expose radio's serial port on this TCP port") s := getopt.BoolLong("enable-serial-device", 's', "Expose radio's serial port as a virtual serial port") + i := getopt.Uint16Long("log-interval", 'i', 100, "Status log interval in milliseconds") getopt.Parse() @@ -31,4 +34,5 @@ func parseArgs() { connectAddress = *a serialTCPPort = *t enableSerialDevice = *s + statusLogInterval = time.Duration(*i) * time.Millisecond } diff --git a/bandwidth.go b/bandwidth.go index 5f7efed..2d42f7a 100644 --- a/bandwidth.go +++ b/bandwidth.go @@ -11,8 +11,12 @@ type bandwidthStruct struct { toRadioPkts int fromRadioBytes int fromRadioPkts int - lostPkts int lastGet time.Time + + lostPkts int + lastLostReset time.Time + retransmits int + lastRetransmitReset time.Time } var bandwidth bandwidthStruct @@ -40,29 +44,56 @@ func (b *bandwidthStruct) add(toRadioBytes, fromRadioBytes int) { } } -// Call this function when a packet is sent or received. func (b *bandwidthStruct) reportLoss(pkts int) { bandwidthMutex.Lock() defer bandwidthMutex.Unlock() + if b.lostPkts == 0 { + b.lastLostReset = time.Now() + } b.lostPkts += pkts } -func (b *bandwidthStruct) get() (toRadioBytesPerSec, fromRadioBytesPerSec int, lossPercent float64) { +func (b *bandwidthStruct) reportRetransmit(pkts int) { + bandwidthMutex.Lock() + defer bandwidthMutex.Unlock() + + if b.retransmits == 0 { + b.lastRetransmitReset = time.Now() + } + b.retransmits += pkts +} + +func (b *bandwidthStruct) get() (toRadioBytesPerSec, fromRadioBytesPerSec int, lost int, retransmits int) { bandwidthMutex.Lock() defer bandwidthMutex.Unlock() secs := time.Since(b.lastGet).Seconds() toRadioBytesPerSec = int(float64(b.toRadioBytes) / secs) fromRadioBytesPerSec = int(float64(b.fromRadioBytes) / secs) - lossPercent = (float64(b.lostPkts) / float64(b.lostPkts+b.fromRadioPkts)) * 100 b.toRadioBytes = 0 b.toRadioPkts = 0 b.fromRadioBytes = 0 b.fromRadioPkts = 0 - b.lostPkts = 0 b.lastGet = time.Now() + + secs = time.Since(b.lastLostReset).Seconds() + lost = b.lostPkts + // Only resetting error reports in a longer timeframe. + if secs >= 60 { + b.lostPkts = 0 + b.lastLostReset = time.Now() + } + + secs = time.Since(b.lastRetransmitReset).Seconds() + retransmits = b.retransmits + // Only resetting error reports in a longer timeframe. + if secs >= 60 { + b.retransmits = 0 + b.lastRetransmitReset = time.Now() + } + return } diff --git a/controlstream.go b/controlstream.go index 0028354..c147649 100644 --- a/controlstream.go +++ b/controlstream.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/binary" "errors" - "fmt" "time" ) @@ -12,8 +11,6 @@ const controlStreamPort = 50001 const serialStreamPort = 50002 const audioStreamPort = 50003 -const statusLogInterval = 3 * time.Second - type controlStream struct { common streamCommon serial serialStream @@ -218,13 +215,13 @@ func (s *controlStream) handleRead(r []byte) error { } s.serialAndAudioStreamOpened = true + statusLog.startPeriodicPrint() } } return nil } func (s *controlStream) loop() { - startTime := time.Now() bandwidth.reset() s.secondAuthTimer = time.NewTimer(200 * time.Millisecond) @@ -232,7 +229,6 @@ func (s *controlStream) loop() { <-s.reauthTimeoutTimer.C reauthTicker := time.NewTicker(25 * time.Second) - statusLogTicker := time.NewTicker(statusLogInterval) for { select { @@ -255,14 +251,6 @@ func (s *controlStream) loop() { } case <-s.reauthTimeoutTimer.C: log.Error("auth timeout, audio/serial stream may stop") - case <-statusLogTicker.C: - if s.serialAndAudioStreamOpened { - up, down, loss := bandwidth.get() - log.Print("running for ", time.Since(startTime).Round(time.Second), - " rtt ", s.common.pkt7.latency.Milliseconds(), "ms up ", - bandwidth.formatByteCount(up), "/s down ", - bandwidth.formatByteCount(down), "/s loss ", fmt.Sprintf("%.2f", loss), "%") - } case <-s.deinitNeededChan: s.deinitFinishedChan <- true return @@ -330,6 +318,7 @@ func (s *controlStream) init() error { func (s *controlStream) deinit() { s.deinitializing = true s.serialAndAudioStreamOpened = false + statusLog.stopPeriodicPrint() if s.deinitNeededChan != nil { s.deinitNeededChan <- true diff --git a/log.go b/log.go index 8dd19b9..26f9072 100644 --- a/log.go +++ b/log.go @@ -27,39 +27,70 @@ func (l *logger) GetCallerFileName(withLine bool) string { } } +func (l *logger) printLineClear() { + fmt.Printf("%c[2K", 27) +} + func (l *logger) Printf(a string, b ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Infof(l.GetCallerFileName(false)+": "+a, b...) } func (l *logger) Print(a ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Info(append([]interface{}{l.GetCallerFileName(false) + ": "}, a...)...) } func (l *logger) Debugf(a string, b ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Debugf(l.GetCallerFileName(true)+": "+a, b...) } func (l *logger) Debug(a ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Debug(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) } func (l *logger) Errorf(a string, b ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Errorf(l.GetCallerFileName(true)+": "+a, b...) } func (l *logger) Error(a ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Error(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) } func (l *logger) ErrorC(a ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Error(a...) } func (l *logger) Fatalf(a string, b ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Fatalf(l.GetCallerFileName(true)+": "+a, b...) } func (l *logger) Fatal(a ...interface{}) { + if ctrl != nil { + l.printLineClear() + } l.logger.Fatal(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) } diff --git a/main.go b/main.go index a402d2d..0ab7b78 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( ) var gotErrChan = make(chan bool) +var ctrl *controlStream func getAboutStr() string { var v string @@ -23,6 +24,10 @@ func getAboutStr() string { } func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) { + defer func() { + ctrl = nil + }() + // Depleting gotErrChan. var finished bool for !finished { @@ -33,17 +38,17 @@ func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) { } } - c := controlStream{} + ctrl = &controlStream{} - if err := c.init(); err != nil { + if err := ctrl.init(); err != nil { log.Error(err) - c.deinit() + ctrl.deinit() return false, 0 } select { case <-gotErrChan: - c.deinit() + ctrl.deinit() // Need to wait before reinit because the IC-705 will disconnect our audio stream eventually if we relogin // in a too short interval without a deauth... @@ -59,7 +64,7 @@ func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) { return case <-osSignal: log.Print("sigterm received") - c.deinit() + ctrl.deinit() return true, 0 } } diff --git a/pkt0.go b/pkt0.go index f1a163f..4f7321c 100644 --- a/pkt0.go +++ b/pkt0.go @@ -36,6 +36,7 @@ func (p *pkt0Type) retransmitRange(s *streamCommon, start, end uint16) error { log.Debug("got retransmit request for #", start, "-", end) for { + bandwidth.reportRetransmit(1) d := p.txSeqBuf.get(seqNum(start)) if d != nil { log.Debug(s.name+"/retransmitting #", start) @@ -69,6 +70,7 @@ func (p *pkt0Type) handle(s *streamCommon, r []byte) error { d := p.txSeqBuf.get(seqNum(seq)) if d != nil { log.Debug(s.name+"/retransmitting #", seq) + bandwidth.reportRetransmit(1) if err := s.send(d); err != nil { return err } diff --git a/pkt7.go b/pkt7.go index 11ccfae..7699b8a 100644 --- a/pkt7.go +++ b/pkt7.go @@ -46,9 +46,12 @@ func (p *pkt7Type) handle(s *streamCommon, r []byte) error { p.timeoutTimer.Reset(pkt7TimeoutDuration) } - // Only measure latency after the timeout has been initialized, so the auth is already done. - p.latency += time.Since(p.lastSendAt) - p.latency /= 2 + if s.name == "control" { // Only measure latency on the control stream. + // Only measure latency after the timeout has been initialized, so the auth is already done. + p.latency += time.Since(p.lastSendAt) + p.latency /= 2 + statusLog.reportRTTLatency(p.latency) + } } // expectedSeq := p.lastConfirmedSeq + 1 diff --git a/status.go b/status.go new file mode 100644 index 0000000..36209a2 --- /dev/null +++ b/status.go @@ -0,0 +1,76 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +type statusLogStruct struct { + ticker *time.Ticker + stopChan chan bool + stopFinishedChan chan bool + mutex sync.Mutex + + startTime time.Time + rttLatency time.Duration +} + +var statusLog statusLogStruct + +func (s *statusLogStruct) reportRTTLatency(l time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.rttLatency = l +} + +func (s *statusLogStruct) print() { + s.mutex.Lock() + defer s.mutex.Unlock() + + up, down, lost, retransmits := bandwidth.get() + + l := fmt.Sprint("up ", time.Since(s.startTime).Round(time.Second), + " rtt ", s.rttLatency.Milliseconds(), "ms up ", + bandwidth.formatByteCount(up), "/s down ", + bandwidth.formatByteCount(down), "/s retx ", retransmits, " /1m lost ", lost, " /1m") + + if statusLogInterval < time.Second { + log.printLineClear() + fmt.Print(time.Now().Format("2006-01-02T15:04:05.000Z0700"), " ", l, "\r") + } else { + log.Print(l) + } +} + +func (s *statusLogStruct) loop() { + for { + select { + case <-s.ticker.C: + s.print() + case <-s.stopChan: + s.stopFinishedChan <- true + return + } + } +} + +func (s *statusLogStruct) startPeriodicPrint() { + s.startTime = time.Now() + s.stopChan = make(chan bool) + s.stopFinishedChan = make(chan bool) + s.ticker = time.NewTicker(statusLogInterval) + go s.loop() +} + +func (s *statusLogStruct) stopPeriodicPrint() { + if s.ticker == nil { // Already stopped? + return + } + s.ticker.Stop() + s.ticker = nil + + s.stopChan <- true + <-s.stopFinishedChan +} diff --git a/streamcommon.go b/streamcommon.go index 0c2cc21..7eed9c9 100644 --- a/streamcommon.go +++ b/streamcommon.go @@ -192,11 +192,13 @@ func (s *streamCommon) requestRetransmitIfNeeded(gotSeq uint16) error { } if missingPkts == 1 { log.Debug(s.name+"/requesting pkt #", sr[1], " retransmit") + bandwidth.reportRetransmit(missingPkts) if err := s.sendRetransmitRequest(sr[1]); err != nil { return err } } else if missingPkts < 50 { log.Debug(s.name+"/requesting pkt #", sr[0], "-#", sr[1], " retransmit") + bandwidth.reportRetransmit(missingPkts) if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil { return err }