Restructure status log handling

This commit is contained in:
Nonoo 2020-10-28 18:03:35 +01:00
parent 757dc58280
commit bbcde03608
11 changed files with 263 additions and 214 deletions

View file

@ -21,6 +21,7 @@ type audioStream struct {
timeoutTimer *time.Timer timeoutTimer *time.Timer
receivedAudio bool receivedAudio bool
lastReceivedSeq uint16 lastReceivedSeq uint16
serverAudioTime time.Time
rxSeqBuf seqBuf rxSeqBuf seqBuf
rxSeqBufEntryChan chan seqBufEntry rxSeqBufEntryChan chan seqBufEntry
@ -74,28 +75,33 @@ func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) {
} else { } else {
missingPkts = int(gotSeq) + 65536 - int(expectedSeq) missingPkts = int(gotSeq) + 65536 - int(expectedSeq)
} }
bandwidth.reportLoss(missingPkts) netstat.reportLoss(missingPkts)
log.Error("lost ", missingPkts, " audio packets") log.Error("lost ", missingPkts, " audio packets")
s.serverAudioTime = s.serverAudioTime.Add(time.Duration(10*missingPkts) * time.Millisecond)
} }
s.serverAudioTime = s.serverAudioTime.Add(10 * time.Millisecond)
} else {
s.serverAudioTime = time.Now()
} }
statusLog.reportServerAudioTime(s.serverAudioTime)
s.lastReceivedSeq = gotSeq s.lastReceivedSeq = gotSeq
s.receivedAudio = true s.receivedAudio = true
s.audio.play <- e.data s.audio.play <- e.data
} }
//var drop int // var drop int
func (s *audioStream) handleAudioPacket(r []byte) error { func (s *audioStream) handleAudioPacket(r []byte) error {
gotSeq := binary.LittleEndian.Uint16(r[6:8]) gotSeq := binary.LittleEndian.Uint16(r[6:8])
// if drop == 0 && time.Now().UnixNano()%50 == 0 { // if drop == 0 && time.Now().UnixNano()%10 == 0 {
// log.Print("drop start - ", gotSeq) // log.Print("drop start - ", gotSeq)
// drop = 1 // drop = 1
// return nil // return nil
// } else if drop > 0 { // } else if drop > 0 {
// drop++ // drop++
// if drop >= 9 { // if drop >= int(time.Now().UnixNano()%10) {
// log.Print("drop stop - ", gotSeq) // log.Print("drop stop - ", gotSeq)
// drop = 0 // drop = 0
// } else { // } else {

View file

@ -1,111 +0,0 @@
package main
import (
"fmt"
"sync"
"time"
)
type bandwidthStruct struct {
toRadioBytes int
toRadioPkts int
fromRadioBytes int
fromRadioPkts int
lastGet time.Time
lostPkts int
lastLostReset time.Time
retransmits int
lastRetransmitReset time.Time
}
var bandwidth bandwidthStruct
var bandwidthMutex sync.Mutex
func (b *bandwidthStruct) reset() {
bandwidthMutex.Lock()
defer bandwidthMutex.Unlock()
bandwidth = bandwidthStruct{}
}
// Call this function when a packet is sent or received.
func (b *bandwidthStruct) add(toRadioBytes, fromRadioBytes int) {
bandwidthMutex.Lock()
defer bandwidthMutex.Unlock()
b.toRadioBytes += toRadioBytes
if toRadioBytes > 0 {
b.toRadioPkts++
}
b.fromRadioBytes += fromRadioBytes
if fromRadioBytes > 0 {
b.fromRadioPkts++
}
}
func (b *bandwidthStruct) reportLoss(pkts int) {
bandwidthMutex.Lock()
defer bandwidthMutex.Unlock()
if b.lostPkts == 0 {
b.lastLostReset = time.Now()
}
b.lostPkts += pkts
}
func (b *bandwidthStruct) reportRetransmit(pkts int) {
bandwidthMutex.Lock()
defer bandwidthMutex.Unlock()
if b.retransmits == 0 {
b.lastRetransmitReset = time.Now()
}
b.retransmits += pkts
}
func (b *bandwidthStruct) get() (toRadioBytesPerSec, fromRadioBytesPerSec int, lost int, retransmits int) {
bandwidthMutex.Lock()
defer bandwidthMutex.Unlock()
secs := time.Since(b.lastGet).Seconds()
toRadioBytesPerSec = int(float64(b.toRadioBytes) / secs)
fromRadioBytesPerSec = int(float64(b.fromRadioBytes) / secs)
b.toRadioBytes = 0
b.toRadioPkts = 0
b.fromRadioBytes = 0
b.fromRadioPkts = 0
b.lastGet = time.Now()
secs = time.Since(b.lastLostReset).Seconds()
lost = b.lostPkts
// Only resetting error reports in a longer timeframe.
if secs >= 60 {
b.lostPkts = 0
b.lastLostReset = time.Now()
}
secs = time.Since(b.lastRetransmitReset).Seconds()
retransmits = b.retransmits
// Only resetting error reports in a longer timeframe.
if secs >= 60 {
b.retransmits = 0
b.lastRetransmitReset = time.Now()
}
return
}
func (b *bandwidthStruct) formatByteCount(c int) string {
const unit = 1000
if c < unit {
return fmt.Sprintf("%d B", c)
}
div, exp := int(unit), 0
for n := c / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(c)/float64(div), "kMGTPE"[exp])
}

View file

@ -222,7 +222,7 @@ func (s *controlStream) handleRead(r []byte) error {
} }
func (s *controlStream) loop() { func (s *controlStream) loop() {
bandwidth.reset() netstat.reset()
s.secondAuthTimer = time.NewTimer(200 * time.Millisecond) s.secondAuthTimer = time.NewTimer(200 * time.Millisecond)
s.reauthTimeoutTimer = time.NewTimer(0) s.reauthTimeoutTimer = time.NewTimer(0)

27
log.go
View file

@ -32,64 +32,73 @@ func (l *logger) printLineClear() {
} }
func (l *logger) Printf(a string, b ...interface{}) { func (l *logger) Printf(a string, b ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Infof(l.GetCallerFileName(false)+": "+a, b...) l.logger.Infof(l.GetCallerFileName(false)+": "+a, b...)
} }
func (l *logger) Print(a ...interface{}) { func (l *logger) Print(a ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Info(append([]interface{}{l.GetCallerFileName(false) + ": "}, a...)...) l.logger.Info(append([]interface{}{l.GetCallerFileName(false) + ": "}, a...)...)
} }
func (l *logger) Debugf(a string, b ...interface{}) { func (l *logger) Debugf(a string, b ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Debugf(l.GetCallerFileName(true)+": "+a, b...) l.logger.Debugf(l.GetCallerFileName(true)+": "+a, b...)
} }
func (l *logger) Debug(a ...interface{}) { func (l *logger) Debug(a ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Debug(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) l.logger.Debug(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...)
} }
func (l *logger) Errorf(a string, b ...interface{}) { func (l *logger) Errorf(a string, b ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Errorf(l.GetCallerFileName(true)+": "+a, b...) l.logger.Errorf(l.GetCallerFileName(true)+": "+a, b...)
} }
func (l *logger) Error(a ...interface{}) { func (l *logger) Error(a ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Error(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) l.logger.Error(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...)
} }
func (l *logger) ErrorC(a ...interface{}) { func (l *logger) ErrorC(a ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Error(a...) l.logger.Error(a...)
} }
func (l *logger) Fatalf(a string, b ...interface{}) { func (l *logger) Fatalf(a string, b ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Fatalf(l.GetCallerFileName(true)+": "+a, b...) l.logger.Fatalf(l.GetCallerFileName(true)+": "+a, b...)
} }
func (l *logger) Fatal(a ...interface{}) { func (l *logger) Fatal(a ...interface{}) {
if ctrl != nil { if statusLog.isRealtime() {
l.printLineClear() l.printLineClear()
defer statusLog.print()
} }
l.logger.Fatal(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) l.logger.Fatal(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...)
} }

View file

@ -10,7 +10,6 @@ import (
) )
var gotErrChan = make(chan bool) var gotErrChan = make(chan bool)
var ctrl *controlStream
func getAboutStr() string { func getAboutStr() string {
var v string var v string
@ -24,10 +23,6 @@ func getAboutStr() string {
} }
func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) { func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) {
defer func() {
ctrl = nil
}()
// Depleting gotErrChan. // Depleting gotErrChan.
var finished bool var finished bool
for !finished { for !finished {
@ -38,7 +33,7 @@ func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) {
} }
} }
ctrl = &controlStream{} ctrl := &controlStream{}
if err := ctrl.init(); err != nil { if err := ctrl.init(); err != nil {
log.Error(err) log.Error(err)

105
netstat.go Normal file
View file

@ -0,0 +1,105 @@
package main
import (
"fmt"
"sync"
"time"
)
type netstatStruct struct {
toRadioBytes int
toRadioPkts int
fromRadioBytes int
fromRadioPkts int
lastGet time.Time
lostPkts int
lastLostReport time.Time
retransmits int
lastRetransmitReport time.Time
}
var netstat netstatStruct
var netstatMutex sync.Mutex
func (b *netstatStruct) reset() {
netstatMutex.Lock()
defer netstatMutex.Unlock()
netstat = netstatStruct{}
}
// Call this function when a packet is sent or received.
func (b *netstatStruct) add(toRadioBytes, fromRadioBytes int) {
netstatMutex.Lock()
defer netstatMutex.Unlock()
b.toRadioBytes += toRadioBytes
if toRadioBytes > 0 {
b.toRadioPkts++
}
b.fromRadioBytes += fromRadioBytes
if fromRadioBytes > 0 {
b.fromRadioPkts++
}
}
func (b *netstatStruct) reportLoss(pkts int) {
netstatMutex.Lock()
defer netstatMutex.Unlock()
b.lastLostReport = time.Now()
b.lostPkts += pkts
}
func (b *netstatStruct) reportRetransmit(pkts int) {
netstatMutex.Lock()
defer netstatMutex.Unlock()
b.lastRetransmitReport = time.Now()
b.retransmits += pkts
}
func (b *netstatStruct) get() (toRadioBytesPerSec, fromRadioBytesPerSec int, lost int, retransmits int) {
netstatMutex.Lock()
defer netstatMutex.Unlock()
secs := time.Since(b.lastGet).Seconds()
toRadioBytesPerSec = int(float64(b.toRadioBytes) / secs)
fromRadioBytesPerSec = int(float64(b.fromRadioBytes) / secs)
b.toRadioBytes = 0
b.toRadioPkts = 0
b.fromRadioBytes = 0
b.fromRadioPkts = 0
b.lastGet = time.Now()
secs = time.Since(b.lastLostReport).Seconds()
lost = b.lostPkts
if secs >= 60 {
b.lostPkts = 0
b.lastLostReport = time.Now()
}
secs = time.Since(b.lastRetransmitReport).Seconds()
retransmits = b.retransmits
if secs >= 60 {
b.retransmits = 0
b.lastRetransmitReport = time.Now()
}
return
}
func (b *netstatStruct) formatByteCount(c int) string {
const unit = 1000
if c < unit {
return fmt.Sprintf("%d B", c)
}
div, exp := int(unit), 0
for n := c / unit; n >= unit; n /= unit {
div *= unit
exp++
}
return fmt.Sprintf("%.1f %cB", float64(c)/float64(div), "kMGTPE"[exp])
}

View file

@ -36,7 +36,7 @@ func (p *pkt0Type) retransmitRange(s *streamCommon, start, end uint16) error {
log.Debug("got retransmit request for #", start, "-", end) log.Debug("got retransmit request for #", start, "-", end)
for { for {
bandwidth.reportRetransmit(1) netstat.reportRetransmit(1)
d := p.txSeqBuf.get(seqNum(start)) d := p.txSeqBuf.get(seqNum(start))
if d != nil { if d != nil {
log.Debug(s.name+"/retransmitting #", start) log.Debug(s.name+"/retransmitting #", start)
@ -70,7 +70,7 @@ func (p *pkt0Type) handle(s *streamCommon, r []byte) error {
d := p.txSeqBuf.get(seqNum(seq)) d := p.txSeqBuf.get(seqNum(seq))
if d != nil { if d != nil {
log.Debug(s.name+"/retransmitting #", seq) log.Debug(s.name+"/retransmitting #", seq)
bandwidth.reportRetransmit(1) netstat.reportRetransmit(1)
if err := s.send(d); err != nil { if err := s.send(d); err != nil {
return err return err
} }

View file

@ -82,7 +82,7 @@ func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) {
} else { } else {
missingPkts = int(gotSeq) + 65536 - int(expectedSeq) missingPkts = int(gotSeq) + 65536 - int(expectedSeq)
} }
bandwidth.reportLoss(missingPkts) netstat.reportLoss(missingPkts)
log.Error("lost ", missingPkts, " packets") log.Error("lost ", missingPkts, " packets")
} }
} }

View file

@ -1,76 +0,0 @@
package main
import (
"fmt"
"sync"
"time"
)
type statusLogStruct struct {
ticker *time.Ticker
stopChan chan bool
stopFinishedChan chan bool
mutex sync.Mutex
startTime time.Time
rttLatency time.Duration
}
var statusLog statusLogStruct
func (s *statusLogStruct) reportRTTLatency(l time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.rttLatency = l
}
func (s *statusLogStruct) print() {
s.mutex.Lock()
defer s.mutex.Unlock()
up, down, lost, retransmits := bandwidth.get()
l := fmt.Sprint("up ", time.Since(s.startTime).Round(time.Second),
" rtt ", s.rttLatency.Milliseconds(), "ms up ",
bandwidth.formatByteCount(up), "/s down ",
bandwidth.formatByteCount(down), "/s retx ", retransmits, " /1m lost ", lost, " /1m")
if statusLogInterval < time.Second {
log.printLineClear()
fmt.Print(time.Now().Format("2006-01-02T15:04:05.000Z0700"), " ", l, "\r")
} else {
log.Print(l)
}
}
func (s *statusLogStruct) loop() {
for {
select {
case <-s.ticker.C:
s.print()
case <-s.stopChan:
s.stopFinishedChan <- true
return
}
}
}
func (s *statusLogStruct) startPeriodicPrint() {
s.startTime = time.Now()
s.stopChan = make(chan bool)
s.stopFinishedChan = make(chan bool)
s.ticker = time.NewTicker(statusLogInterval)
go s.loop()
}
func (s *statusLogStruct) stopPeriodicPrint() {
if s.ticker == nil { // Already stopped?
return
}
s.ticker.Stop()
s.ticker = nil
s.stopChan <- true
<-s.stopFinishedChan
}

121
statuslog.go Normal file
View file

@ -0,0 +1,121 @@
package main
import (
"fmt"
"sync"
"time"
)
type statusLogStruct struct {
ticker *time.Ticker
stopChan chan bool
stopFinishedChan chan bool
mutex sync.Mutex
line string
startTime time.Time
rttLatency time.Duration
audioTimeDiff time.Duration
}
var statusLog statusLogStruct
func (s *statusLogStruct) reportRTTLatency(l time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.rttLatency = l
}
func (s *statusLogStruct) reportServerAudioTime(t time.Time) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.audioTimeDiff = time.Since(t)
}
func (s *statusLogStruct) print() {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.isRealtimeInternal() {
log.printLineClear()
fmt.Print(s.line)
} else {
log.Print(s.line)
}
}
func (s *statusLogStruct) update() {
s.mutex.Lock()
defer s.mutex.Unlock()
up, down, lost, retransmits := netstat.get()
var debugStr string
if verboseLog {
debugStr = fmt.Sprint(" adiff ", s.audioTimeDiff.Milliseconds(), "ms")
}
s.line = fmt.Sprint("up ", time.Since(s.startTime).Round(time.Second),
" rtt ", s.rttLatency.Milliseconds(), "ms up ",
netstat.formatByteCount(up), "/s down ",
netstat.formatByteCount(down), "/s retx ", retransmits, " /1m lost ", lost, " /1m", debugStr)
if s.isRealtimeInternal() {
s.line = fmt.Sprint(time.Now().Format("2006-01-02T15:04:05.000Z0700"), " ", s.line, "\r")
}
}
func (s *statusLogStruct) loop() {
for {
select {
case <-s.ticker.C:
s.update()
s.print()
case <-s.stopChan:
s.stopFinishedChan <- true
return
}
}
}
func (s *statusLogStruct) isRealtimeInternal() bool {
return statusLogInterval < time.Second
}
func (s *statusLogStruct) isRealtime() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.ticker != nil && s.isRealtimeInternal()
}
func (s *statusLogStruct) isActive() bool {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.ticker != nil
}
func (s *statusLogStruct) startPeriodicPrint() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.startTime = time.Now()
s.stopChan = make(chan bool)
s.stopFinishedChan = make(chan bool)
s.ticker = time.NewTicker(statusLogInterval)
go s.loop()
}
func (s *statusLogStruct) stopPeriodicPrint() {
if !s.isActive() {
return
}
s.ticker.Stop()
s.ticker = nil
s.stopChan <- true
<-s.stopFinishedChan
}

View file

@ -31,7 +31,7 @@ func (s *streamCommon) send(d []byte) error {
if _, err := s.conn.Write(d); err != nil { if _, err := s.conn.Write(d); err != nil {
return err return err
} }
bandwidth.add(len(d), 0) netstat.add(len(d), 0)
return nil return nil
} }
@ -39,7 +39,7 @@ func (s *streamCommon) read() ([]byte, error) {
b := make([]byte, 1500) b := make([]byte, 1500)
n, _, err := s.conn.ReadFromUDP(b) n, _, err := s.conn.ReadFromUDP(b)
if err == nil { if err == nil {
bandwidth.add(0, n) netstat.add(0, n)
} }
return b[:n], err return b[:n], err
} }
@ -192,13 +192,13 @@ func (s *streamCommon) requestRetransmitIfNeeded(gotSeq uint16) error {
} }
if missingPkts == 1 { if missingPkts == 1 {
log.Debug(s.name+"/requesting pkt #", sr[1], " retransmit") log.Debug(s.name+"/requesting pkt #", sr[1], " retransmit")
bandwidth.reportRetransmit(missingPkts) netstat.reportRetransmit(missingPkts)
if err := s.sendRetransmitRequest(sr[1]); err != nil { if err := s.sendRetransmitRequest(sr[1]); err != nil {
return err return err
} }
} else if missingPkts < 50 { } else if missingPkts < 50 {
log.Debug(s.name+"/requesting pkt #", sr[0], "-#", sr[1], " retransmit") log.Debug(s.name+"/requesting pkt #", sr[0], "-#", sr[1], " retransmit")
bandwidth.reportRetransmit(missingPkts) netstat.reportRetransmit(missingPkts)
if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil { if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil {
return err return err
} }