diff --git a/audiostream.go b/audiostream.go index b81c5ef..33fe597 100644 --- a/audiostream.go +++ b/audiostream.go @@ -21,6 +21,7 @@ type audioStream struct { timeoutTimer *time.Timer receivedAudio bool lastReceivedSeq uint16 + serverAudioTime time.Time rxSeqBuf seqBuf rxSeqBufEntryChan chan seqBufEntry @@ -74,28 +75,33 @@ func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) { } else { missingPkts = int(gotSeq) + 65536 - int(expectedSeq) } - bandwidth.reportLoss(missingPkts) + netstat.reportLoss(missingPkts) log.Error("lost ", missingPkts, " audio packets") + s.serverAudioTime = s.serverAudioTime.Add(time.Duration(10*missingPkts) * time.Millisecond) } + s.serverAudioTime = s.serverAudioTime.Add(10 * time.Millisecond) + } else { + s.serverAudioTime = time.Now() } + statusLog.reportServerAudioTime(s.serverAudioTime) s.lastReceivedSeq = gotSeq s.receivedAudio = true s.audio.play <- e.data } -//var drop int +// var drop int func (s *audioStream) handleAudioPacket(r []byte) error { gotSeq := binary.LittleEndian.Uint16(r[6:8]) - // if drop == 0 && time.Now().UnixNano()%50 == 0 { + // if drop == 0 && time.Now().UnixNano()%10 == 0 { // log.Print("drop start - ", gotSeq) // drop = 1 // return nil // } else if drop > 0 { // drop++ - // if drop >= 9 { + // if drop >= int(time.Now().UnixNano()%10) { // log.Print("drop stop - ", gotSeq) // drop = 0 // } else { diff --git a/bandwidth.go b/bandwidth.go deleted file mode 100644 index 2d42f7a..0000000 --- a/bandwidth.go +++ /dev/null @@ -1,111 +0,0 @@ -package main - -import ( - "fmt" - "sync" - "time" -) - -type bandwidthStruct struct { - toRadioBytes int - toRadioPkts int - fromRadioBytes int - fromRadioPkts int - lastGet time.Time - - lostPkts int - lastLostReset time.Time - retransmits int - lastRetransmitReset time.Time -} - -var bandwidth bandwidthStruct -var bandwidthMutex sync.Mutex - -func (b *bandwidthStruct) reset() { - bandwidthMutex.Lock() - defer bandwidthMutex.Unlock() - - bandwidth = bandwidthStruct{} -} - -// Call this function when a packet is sent or received. -func (b *bandwidthStruct) add(toRadioBytes, fromRadioBytes int) { - bandwidthMutex.Lock() - defer bandwidthMutex.Unlock() - - b.toRadioBytes += toRadioBytes - if toRadioBytes > 0 { - b.toRadioPkts++ - } - b.fromRadioBytes += fromRadioBytes - if fromRadioBytes > 0 { - b.fromRadioPkts++ - } -} - -func (b *bandwidthStruct) reportLoss(pkts int) { - bandwidthMutex.Lock() - defer bandwidthMutex.Unlock() - - if b.lostPkts == 0 { - b.lastLostReset = time.Now() - } - b.lostPkts += pkts -} - -func (b *bandwidthStruct) reportRetransmit(pkts int) { - bandwidthMutex.Lock() - defer bandwidthMutex.Unlock() - - if b.retransmits == 0 { - b.lastRetransmitReset = time.Now() - } - b.retransmits += pkts -} - -func (b *bandwidthStruct) get() (toRadioBytesPerSec, fromRadioBytesPerSec int, lost int, retransmits int) { - bandwidthMutex.Lock() - defer bandwidthMutex.Unlock() - - secs := time.Since(b.lastGet).Seconds() - toRadioBytesPerSec = int(float64(b.toRadioBytes) / secs) - fromRadioBytesPerSec = int(float64(b.fromRadioBytes) / secs) - - b.toRadioBytes = 0 - b.toRadioPkts = 0 - b.fromRadioBytes = 0 - b.fromRadioPkts = 0 - b.lastGet = time.Now() - - secs = time.Since(b.lastLostReset).Seconds() - lost = b.lostPkts - // Only resetting error reports in a longer timeframe. - if secs >= 60 { - b.lostPkts = 0 - b.lastLostReset = time.Now() - } - - secs = time.Since(b.lastRetransmitReset).Seconds() - retransmits = b.retransmits - // Only resetting error reports in a longer timeframe. - if secs >= 60 { - b.retransmits = 0 - b.lastRetransmitReset = time.Now() - } - - return -} - -func (b *bandwidthStruct) formatByteCount(c int) string { - const unit = 1000 - if c < unit { - return fmt.Sprintf("%d B", c) - } - div, exp := int(unit), 0 - for n := c / unit; n >= unit; n /= unit { - div *= unit - exp++ - } - return fmt.Sprintf("%.1f %cB", float64(c)/float64(div), "kMGTPE"[exp]) -} diff --git a/controlstream.go b/controlstream.go index f244aef..a02c9fe 100644 --- a/controlstream.go +++ b/controlstream.go @@ -222,7 +222,7 @@ func (s *controlStream) handleRead(r []byte) error { } func (s *controlStream) loop() { - bandwidth.reset() + netstat.reset() s.secondAuthTimer = time.NewTimer(200 * time.Millisecond) s.reauthTimeoutTimer = time.NewTimer(0) diff --git a/log.go b/log.go index 26f9072..4b75ebe 100644 --- a/log.go +++ b/log.go @@ -32,64 +32,73 @@ func (l *logger) printLineClear() { } func (l *logger) Printf(a string, b ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Infof(l.GetCallerFileName(false)+": "+a, b...) } func (l *logger) Print(a ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Info(append([]interface{}{l.GetCallerFileName(false) + ": "}, a...)...) } func (l *logger) Debugf(a string, b ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Debugf(l.GetCallerFileName(true)+": "+a, b...) } func (l *logger) Debug(a ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Debug(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) } func (l *logger) Errorf(a string, b ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Errorf(l.GetCallerFileName(true)+": "+a, b...) } func (l *logger) Error(a ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Error(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) } func (l *logger) ErrorC(a ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Error(a...) } func (l *logger) Fatalf(a string, b ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Fatalf(l.GetCallerFileName(true)+": "+a, b...) } func (l *logger) Fatal(a ...interface{}) { - if ctrl != nil { + if statusLog.isRealtime() { l.printLineClear() + defer statusLog.print() } l.logger.Fatal(append([]interface{}{l.GetCallerFileName(true) + ": "}, a...)...) } diff --git a/main.go b/main.go index 0ab7b78..ada5acf 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,6 @@ import ( ) var gotErrChan = make(chan bool) -var ctrl *controlStream func getAboutStr() string { var v string @@ -24,10 +23,6 @@ func getAboutStr() string { } func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) { - defer func() { - ctrl = nil - }() - // Depleting gotErrChan. var finished bool for !finished { @@ -38,7 +33,7 @@ func runControlStream(osSignal chan os.Signal) (shouldExit bool, exitCode int) { } } - ctrl = &controlStream{} + ctrl := &controlStream{} if err := ctrl.init(); err != nil { log.Error(err) diff --git a/netstat.go b/netstat.go new file mode 100644 index 0000000..2d5ce15 --- /dev/null +++ b/netstat.go @@ -0,0 +1,105 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +type netstatStruct struct { + toRadioBytes int + toRadioPkts int + fromRadioBytes int + fromRadioPkts int + lastGet time.Time + + lostPkts int + lastLostReport time.Time + retransmits int + lastRetransmitReport time.Time +} + +var netstat netstatStruct +var netstatMutex sync.Mutex + +func (b *netstatStruct) reset() { + netstatMutex.Lock() + defer netstatMutex.Unlock() + + netstat = netstatStruct{} +} + +// Call this function when a packet is sent or received. +func (b *netstatStruct) add(toRadioBytes, fromRadioBytes int) { + netstatMutex.Lock() + defer netstatMutex.Unlock() + + b.toRadioBytes += toRadioBytes + if toRadioBytes > 0 { + b.toRadioPkts++ + } + b.fromRadioBytes += fromRadioBytes + if fromRadioBytes > 0 { + b.fromRadioPkts++ + } +} + +func (b *netstatStruct) reportLoss(pkts int) { + netstatMutex.Lock() + defer netstatMutex.Unlock() + + b.lastLostReport = time.Now() + b.lostPkts += pkts +} + +func (b *netstatStruct) reportRetransmit(pkts int) { + netstatMutex.Lock() + defer netstatMutex.Unlock() + + b.lastRetransmitReport = time.Now() + b.retransmits += pkts +} + +func (b *netstatStruct) get() (toRadioBytesPerSec, fromRadioBytesPerSec int, lost int, retransmits int) { + netstatMutex.Lock() + defer netstatMutex.Unlock() + + secs := time.Since(b.lastGet).Seconds() + toRadioBytesPerSec = int(float64(b.toRadioBytes) / secs) + fromRadioBytesPerSec = int(float64(b.fromRadioBytes) / secs) + + b.toRadioBytes = 0 + b.toRadioPkts = 0 + b.fromRadioBytes = 0 + b.fromRadioPkts = 0 + b.lastGet = time.Now() + + secs = time.Since(b.lastLostReport).Seconds() + lost = b.lostPkts + if secs >= 60 { + b.lostPkts = 0 + b.lastLostReport = time.Now() + } + + secs = time.Since(b.lastRetransmitReport).Seconds() + retransmits = b.retransmits + if secs >= 60 { + b.retransmits = 0 + b.lastRetransmitReport = time.Now() + } + + return +} + +func (b *netstatStruct) formatByteCount(c int) string { + const unit = 1000 + if c < unit { + return fmt.Sprintf("%d B", c) + } + div, exp := int(unit), 0 + for n := c / unit; n >= unit; n /= unit { + div *= unit + exp++ + } + return fmt.Sprintf("%.1f %cB", float64(c)/float64(div), "kMGTPE"[exp]) +} diff --git a/pkt0.go b/pkt0.go index 4f7321c..0958613 100644 --- a/pkt0.go +++ b/pkt0.go @@ -36,7 +36,7 @@ func (p *pkt0Type) retransmitRange(s *streamCommon, start, end uint16) error { log.Debug("got retransmit request for #", start, "-", end) for { - bandwidth.reportRetransmit(1) + netstat.reportRetransmit(1) d := p.txSeqBuf.get(seqNum(start)) if d != nil { log.Debug(s.name+"/retransmitting #", start) @@ -70,7 +70,7 @@ func (p *pkt0Type) handle(s *streamCommon, r []byte) error { d := p.txSeqBuf.get(seqNum(seq)) if d != nil { log.Debug(s.name+"/retransmitting #", seq) - bandwidth.reportRetransmit(1) + netstat.reportRetransmit(1) if err := s.send(d); err != nil { return err } diff --git a/serialstream.go b/serialstream.go index 3e95bee..b0e905f 100644 --- a/serialstream.go +++ b/serialstream.go @@ -82,7 +82,7 @@ func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) { } else { missingPkts = int(gotSeq) + 65536 - int(expectedSeq) } - bandwidth.reportLoss(missingPkts) + netstat.reportLoss(missingPkts) log.Error("lost ", missingPkts, " packets") } } diff --git a/status.go b/status.go deleted file mode 100644 index 36209a2..0000000 --- a/status.go +++ /dev/null @@ -1,76 +0,0 @@ -package main - -import ( - "fmt" - "sync" - "time" -) - -type statusLogStruct struct { - ticker *time.Ticker - stopChan chan bool - stopFinishedChan chan bool - mutex sync.Mutex - - startTime time.Time - rttLatency time.Duration -} - -var statusLog statusLogStruct - -func (s *statusLogStruct) reportRTTLatency(l time.Duration) { - s.mutex.Lock() - defer s.mutex.Unlock() - - s.rttLatency = l -} - -func (s *statusLogStruct) print() { - s.mutex.Lock() - defer s.mutex.Unlock() - - up, down, lost, retransmits := bandwidth.get() - - l := fmt.Sprint("up ", time.Since(s.startTime).Round(time.Second), - " rtt ", s.rttLatency.Milliseconds(), "ms up ", - bandwidth.formatByteCount(up), "/s down ", - bandwidth.formatByteCount(down), "/s retx ", retransmits, " /1m lost ", lost, " /1m") - - if statusLogInterval < time.Second { - log.printLineClear() - fmt.Print(time.Now().Format("2006-01-02T15:04:05.000Z0700"), " ", l, "\r") - } else { - log.Print(l) - } -} - -func (s *statusLogStruct) loop() { - for { - select { - case <-s.ticker.C: - s.print() - case <-s.stopChan: - s.stopFinishedChan <- true - return - } - } -} - -func (s *statusLogStruct) startPeriodicPrint() { - s.startTime = time.Now() - s.stopChan = make(chan bool) - s.stopFinishedChan = make(chan bool) - s.ticker = time.NewTicker(statusLogInterval) - go s.loop() -} - -func (s *statusLogStruct) stopPeriodicPrint() { - if s.ticker == nil { // Already stopped? - return - } - s.ticker.Stop() - s.ticker = nil - - s.stopChan <- true - <-s.stopFinishedChan -} diff --git a/statuslog.go b/statuslog.go new file mode 100644 index 0000000..08cf535 --- /dev/null +++ b/statuslog.go @@ -0,0 +1,121 @@ +package main + +import ( + "fmt" + "sync" + "time" +) + +type statusLogStruct struct { + ticker *time.Ticker + stopChan chan bool + stopFinishedChan chan bool + mutex sync.Mutex + + line string + + startTime time.Time + rttLatency time.Duration + audioTimeDiff time.Duration +} + +var statusLog statusLogStruct + +func (s *statusLogStruct) reportRTTLatency(l time.Duration) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.rttLatency = l +} + +func (s *statusLogStruct) reportServerAudioTime(t time.Time) { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.audioTimeDiff = time.Since(t) +} + +func (s *statusLogStruct) print() { + s.mutex.Lock() + defer s.mutex.Unlock() + + if s.isRealtimeInternal() { + log.printLineClear() + fmt.Print(s.line) + } else { + log.Print(s.line) + } +} + +func (s *statusLogStruct) update() { + s.mutex.Lock() + defer s.mutex.Unlock() + + up, down, lost, retransmits := netstat.get() + var debugStr string + if verboseLog { + debugStr = fmt.Sprint(" adiff ", s.audioTimeDiff.Milliseconds(), "ms") + } + + s.line = fmt.Sprint("up ", time.Since(s.startTime).Round(time.Second), + " rtt ", s.rttLatency.Milliseconds(), "ms up ", + netstat.formatByteCount(up), "/s down ", + netstat.formatByteCount(down), "/s retx ", retransmits, " /1m lost ", lost, " /1m", debugStr) + + if s.isRealtimeInternal() { + s.line = fmt.Sprint(time.Now().Format("2006-01-02T15:04:05.000Z0700"), " ", s.line, "\r") + } +} + +func (s *statusLogStruct) loop() { + for { + select { + case <-s.ticker.C: + s.update() + s.print() + case <-s.stopChan: + s.stopFinishedChan <- true + return + } + } +} + +func (s *statusLogStruct) isRealtimeInternal() bool { + return statusLogInterval < time.Second +} + +func (s *statusLogStruct) isRealtime() bool { + s.mutex.Lock() + defer s.mutex.Unlock() + + return s.ticker != nil && s.isRealtimeInternal() +} + +func (s *statusLogStruct) isActive() bool { + s.mutex.Lock() + defer s.mutex.Unlock() + + return s.ticker != nil +} + +func (s *statusLogStruct) startPeriodicPrint() { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.startTime = time.Now() + s.stopChan = make(chan bool) + s.stopFinishedChan = make(chan bool) + s.ticker = time.NewTicker(statusLogInterval) + go s.loop() +} + +func (s *statusLogStruct) stopPeriodicPrint() { + if !s.isActive() { + return + } + s.ticker.Stop() + s.ticker = nil + + s.stopChan <- true + <-s.stopFinishedChan +} diff --git a/streamcommon.go b/streamcommon.go index 7eed9c9..b817d60 100644 --- a/streamcommon.go +++ b/streamcommon.go @@ -31,7 +31,7 @@ func (s *streamCommon) send(d []byte) error { if _, err := s.conn.Write(d); err != nil { return err } - bandwidth.add(len(d), 0) + netstat.add(len(d), 0) return nil } @@ -39,7 +39,7 @@ func (s *streamCommon) read() ([]byte, error) { b := make([]byte, 1500) n, _, err := s.conn.ReadFromUDP(b) if err == nil { - bandwidth.add(0, n) + netstat.add(0, n) } return b[:n], err } @@ -192,13 +192,13 @@ func (s *streamCommon) requestRetransmitIfNeeded(gotSeq uint16) error { } if missingPkts == 1 { log.Debug(s.name+"/requesting pkt #", sr[1], " retransmit") - bandwidth.reportRetransmit(missingPkts) + netstat.reportRetransmit(missingPkts) if err := s.sendRetransmitRequest(sr[1]); err != nil { return err } } else if missingPkts < 50 { log.Debug(s.name+"/requesting pkt #", sr[0], "-#", sr[1], " retransmit") - bandwidth.reportRetransmit(missingPkts) + netstat.reportRetransmit(missingPkts) if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil { return err }