From bed7d95f7c28afb8d6713e256bf9f6ec6ff8b6ff Mon Sep 17 00:00:00 2001 From: Nonoo Date: Sat, 21 Nov 2020 11:01:37 +0100 Subject: [PATCH] Decrease latency of RX seqbuf --- audio-linux.go | 7 +- audiostream.go | 11 +- seqbuf.go | 259 ++++++++++++++++++++++++++++++++++++------------ serialstream.go | 12 +-- streamcommon.go | 47 ++++----- 5 files changed, 220 insertions(+), 116 deletions(-) diff --git a/audio-linux.go b/audio-linux.go index f92225b..23d3317 100644 --- a/audio-linux.go +++ b/audio-linux.go @@ -17,8 +17,9 @@ import ( const audioSampleRate = 48000 const audioSampleBytes = 2 const pulseAudioBufferLength = 100 * time.Millisecond -const audioFrameSize = 1920 // 20ms -const maxPlayBufferSize = audioFrameSize * 5 +const audioFrameLength = 20 * time.Millisecond +const audioFrameSize = int((audioSampleRate * audioSampleBytes * audioFrameLength) / time.Second) +const maxPlayBufferSize = audioFrameSize*5 + int((audioSampleRate*audioSampleBytes*audioRxSeqBufLength)/time.Second) type audioStruct struct { devName string @@ -88,7 +89,7 @@ func (a *audioStruct) toggleRecFromDefaultSoundcard() { if a.defaultSoundcardStream.recStream == nil { ss := pulse.SampleSpec{Format: pulse.SAMPLE_S16LE, Rate: audioSampleRate, Channels: 1} battr := pulse.NewBufferAttr() - battr.Fragsize = audioFrameSize + battr.Fragsize = uint32(audioFrameSize) var err error a.defaultSoundcardStream.recStream, err = pulse.NewStream("", "kappanhang", pulse.STREAM_RECORD, "", a.devName, &ss, nil, battr) diff --git a/audiostream.go b/audiostream.go index 7dcb98d..9b7991c 100644 --- a/audiostream.go +++ b/audiostream.go @@ -112,14 +112,7 @@ func (s *audioStream) handleAudioPacket(r []byte) error { s.timeoutTimer.Reset(audioTimeoutDuration) } - addedToFront, _ := s.rxSeqBuf.add(seqNum(gotSeq), r[24:]) - if !addedToFront { - // If the packet is not added to the front of the seqbuf, then it means that it was an answer for a - // retransmit request (or it was an out of order packet which we don't want start a retransmit request). - return nil - } - - return s.common.requestRetransmitIfNeeded(gotSeq) + return s.rxSeqBuf.add(seqNum(gotSeq), r[24:]) } func (s *audioStream) handleRead(r []byte) error { @@ -175,7 +168,7 @@ func (s *audioStream) init(devName string) error { log.Print("stream started") s.rxSeqBufEntryChan = make(chan seqBufEntry) - s.rxSeqBuf.init(audioRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) + s.rxSeqBuf.init(audioRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan, s.common.requestRetransmit) s.timeoutTimer = time.NewTimer(audioTimeoutDuration) diff --git a/seqbuf.go b/seqbuf.go index 761ac5b..6326add 100644 --- a/seqbuf.go +++ b/seqbuf.go @@ -8,17 +8,66 @@ import ( type seqNum int -type seqBufEntry struct { - seq seqNum - data []byte - addedAt time.Time +func (s *seqNum) inc(maxSeqNum seqNum) seqNum { + if *s == maxSeqNum { + return 0 + } + return *s + 1 } +func (s *seqNum) dec(maxSeqNum seqNum) seqNum { + if *s == 0 { + return maxSeqNum + } + return *s - 1 +} + +type seqNumRange [2]seqNum + +func (r *seqNumRange) getDiff(maxSeqNum seqNum) (diff int) { + from := r[0] + to := r[1] + + if to >= from { + diff = int(to) - int(from) + } else { + r[0] = from + r[1] = to + diff = (int(maxSeqNum) + 1) - int(from) + int(to) + } + return +} + +type seqBufEntry struct { + seq seqNum + data []byte +} + +type requestRetransmitCallbackType func(r seqNumRange) error + type seqBuf struct { - length time.Duration - maxSeqNum seqNum - maxSeqNumDiff seqNum - entryChan chan seqBufEntry + length time.Duration + maxSeqNum seqNum + maxSeqNumDiff seqNum + requestRetransmitCallback requestRetransmitCallbackType + + // Available entries coming out from the seqbuf will be sent to entryChan. + entryChan chan seqBufEntry + + // If this is true then the seqBuf is locked, which means no entries will be sent to entryChan. + lockedByInvalidSeq bool + lockedAt time.Time + + // This is false until no packets have been sent to the entryChan. + alreadyReturnedFirstSeq bool + // The seqNum of the last packet sent to entryChan. + lastReturnedSeq seqNum + + requestedRetransmit bool + lastRequestedRetransmitRange seqNumRange + + ignoreMissingPktsUntilEnabled bool + ignoreMissingPktsUntilSeq seqNum // Note that the most recently added entry is stored as the 0th entry. entries []seqBufEntry @@ -27,6 +76,8 @@ type seqBuf struct { entryAddedChan chan bool watcherCloseNeededChan chan bool watcherCloseDoneChan chan bool + + errOutOfOrder error } // func (s *seqBuf) string() (out string) { @@ -44,9 +95,8 @@ type seqBuf struct { func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry { return seqBufEntry{ - seq: seq, - data: data, - addedAt: time.Now(), + seq: seq, + data: data, } } @@ -66,29 +116,26 @@ func (s *seqBuf) addToFront(seq seqNum, data []byte) { func (s *seqBuf) addToBack(seq seqNum, data []byte) { e := s.createEntry(seq, data) - e.addedAt = time.Time{} // Release the packet from the seqbuf as soon as possible, as it is a late entry. s.entries = append(s.entries, e) s.notifyWatcher() } -func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) (addedToFront bool) { +func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) { if toPos == 0 { s.addToFront(seq, data) - return true + return } if toPos >= len(s.entries) { s.addToBack(seq, data) - return false + return } sliceBefore := s.entries[:toPos] sliceAfter := s.entries[toPos:] e := s.createEntry(seq, data) - e.addedAt = time.Time{} // Release the packet from the seqbuf as soon as possible, as it is a late entry. s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...) s.notifyWatcher() - return false } func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum { @@ -132,7 +179,7 @@ func (s *seqBuf) leftOrRightCloserToSeq(seq, whichSeq seqNum) direction { return left } -func (s *seqBuf) add(seq seqNum, data []byte) (addedToFront bool, err error) { +func (s *seqBuf) add(seq seqNum, data []byte) error { s.mutex.Lock() defer s.mutex.Unlock() @@ -142,74 +189,156 @@ func (s *seqBuf) add(seq seqNum, data []byte) (addedToFront bool, err error) { // }() if seq > s.maxSeqNum { - return false, errors.New("seq out of range") + return errors.New("seq out of range") } if len(s.entries) == 0 { s.addToFront(seq, data) - return true, nil + return nil } - if s.entries[0].seq == seq { - return false, errors.New("dropping duplicate seq") + if s.entries[0].seq == seq { // Dropping duplicate seq. + return nil } // Checking the first entry. if s.leftOrRightCloserToSeq(seq, s.entries[0].seq) == left { s.addToFront(seq, data) - return true, nil + return nil } // Parsing through other entries if there are more than 1. for i := 1; i < len(s.entries); i++ { - // This seqnum is already in the queue, but not as the first entry? It's not a duplicate packet. - // It can be a beginning of a new stream for example. + // This seqnum is already in the queue? Ignoring it. if s.entries[i].seq == seq { - // s.addToFront(seq, data) - return false, nil + return nil } if s.leftOrRightCloserToSeq(seq, s.entries[i].seq) == left { // log.Debug("left for ", s.entries[i].seq) - return s.insert(seq, data, i), nil + s.insert(seq, data, i) + return nil } // log.Debug("right for ", s.entries[i].seq) } // No place found for the item? s.addToBack(seq, data) - return false, nil + return nil } -func (s *seqBuf) getNextDataAvailableRemainingTime() (time.Duration, error) { - s.mutex.RLock() - defer s.mutex.RUnlock() +func (s *seqBuf) checkLockTimeout() (timeout bool, shouldRetryIn time.Duration) { + timeSinceLastInvalidSeq := time.Since(s.lockedAt) + if s.length > timeSinceLastInvalidSeq { + shouldRetryIn = s.length - timeSinceLastInvalidSeq + return + } - if len(s.entries) == 0 { - return 0, errors.New("seqbuf is empty") + s.lockedByInvalidSeq = false + // log.Debug("lock timeout") + + if s.requestedRetransmit { + s.ignoreMissingPktsUntilSeq = s.lastRequestedRetransmitRange[1] + s.ignoreMissingPktsUntilEnabled = true } - lastEntryIdx := len(s.entries) - 1 - inBufDuration := time.Since(s.entries[lastEntryIdx].addedAt) - if inBufDuration >= s.length { - return 0, nil - } - return s.length - inBufDuration, nil + + return true, 0 } -func (s *seqBuf) get() (e seqBufEntry, err error) { +// Returns true if all entries from the requested retransmit range have been received. +func (s *seqBuf) gotRetransmitRange() bool { + entryIdx := len(s.entries) + rangeSeq := s.lastRequestedRetransmitRange[0] + + for { + entryIdx-- + if entryIdx < 0 { + return false + } + + if s.entries[entryIdx].seq != rangeSeq { + // log.Debug("entry idx ", entryIdx, " seq #", s.entries[entryIdx].seq, " does not match ", rangeSeq) + // log.Debug(s.string()) + return false + } + + if rangeSeq == s.lastRequestedRetransmitRange[1] { + return true + } + + rangeSeq = rangeSeq.inc(s.maxSeqNum) + } +} + +// shouldRetryIn is only filled when no entry is available, but there are entries in the seqbuf. +// err is not nil if the seqbuf is empty. +func (s *seqBuf) get() (e seqBufEntry, shouldRetryIn time.Duration, err error) { s.mutex.Lock() defer s.mutex.Unlock() if len(s.entries) == 0 { - return e, errors.New("seqbuf is empty") - } - lastEntryIdx := len(s.entries) - 1 - if time.Since(s.entries[lastEntryIdx].addedAt) < s.length { - return e, errors.New("no available entries") + return e, 0, errors.New("seqbuf is empty") } + + entryCount := len(s.entries) + lastEntryIdx := entryCount - 1 e = s.entries[lastEntryIdx] + + if s.alreadyReturnedFirstSeq { + if s.lockedByInvalidSeq { + if s.requestedRetransmit && s.gotRetransmitRange() { + s.lockedByInvalidSeq = false + // log.Debug("lock over") + } else { + var timeout bool + if timeout, shouldRetryIn = s.checkLockTimeout(); !timeout { + return + } + } + } else { + if s.leftOrRightCloserToSeq(e.seq, seqNum(s.lastReturnedSeq)) != left { + // log.Debug("ignoring out of order seq ", e.seq) + s.entries = s.entries[:lastEntryIdx] + err = s.errOutOfOrder + return + } + + if s.ignoreMissingPktsUntilEnabled { + if s.leftOrRightCloserToSeq(e.seq, s.ignoreMissingPktsUntilSeq) == left { + // log.Debug("ignore over ", e.seq, " ", s.ignoreMissingPktsUntilSeq) + s.ignoreMissingPktsUntilEnabled = false + } else { + // log.Debug("ignoring missing pkt, seq #", e.seq, " until ", s.ignoreMissingPktsUntilSeq) + } + } else { + expectedNextSeq := s.lastReturnedSeq.inc(s.maxSeqNum) + + if e.seq != expectedNextSeq { + // log.Debug("lock on, expected seq ", expectedNextSeq, " got ", e.seq) + s.lockedByInvalidSeq = true + s.lockedAt = time.Now() + s.requestedRetransmit = false + s.ignoreMissingPktsUntilEnabled = false + shouldRetryIn = s.length + + if s.requestRetransmitCallback != nil { + s.lastRequestedRetransmitRange[0] = expectedNextSeq + s.lastRequestedRetransmitRange[1] = e.seq.dec(s.maxSeqNum) + if err = s.requestRetransmitCallback(s.lastRequestedRetransmitRange); err == nil { + s.requestedRetransmit = true + } + } + return + } + } + } + } + + s.lastReturnedSeq = e.seq + s.alreadyReturnedFirstSeq = true + s.entries = s.entries[:lastEntryIdx] - return e, nil + return e, 0, nil } func (s *seqBuf) watcher() { @@ -227,25 +356,22 @@ func (s *seqBuf) watcher() { for retry { retry = false - t, err := s.getNextDataAvailableRemainingTime() - if err == nil { - if t == 0 { // Do we have an entry available right now? - e, err := s.get() - if err == nil { - if s.entryChan != nil { - select { - case s.entryChan <- e: - case <-s.watcherCloseNeededChan: - return - } - } - } else { - log.Error(err) + e, t, err := s.get() + if err == nil && t == 0 { + if s.entryChan != nil { + select { + case s.entryChan <- e: + case <-s.watcherCloseNeededChan: + return } + } - // We may have further available entries. + // We may have further available entries. + retry = true + } else { + if err == s.errOutOfOrder { retry = true - } else if !entryAvailableTimerRunning { + } else if !entryAvailableTimerRunning && t > 0 { // An entry will be available later, waiting for it. entryAvailableTimer.Reset(t) entryAvailableTimerRunning = true @@ -265,15 +391,20 @@ func (s *seqBuf) watcher() { // Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range. // Available entries coming out from the seqbuf will be sent to entryChan. -func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry) { +func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry, + requestRetransmitCallback requestRetransmitCallbackType) { s.length = length s.maxSeqNum = maxSeqNum s.maxSeqNumDiff = maxSeqNumDiff s.entryChan = entryChan + s.requestRetransmitCallback = requestRetransmitCallback s.entryAddedChan = make(chan bool) s.watcherCloseNeededChan = make(chan bool) s.watcherCloseDoneChan = make(chan bool) + + s.errOutOfOrder = errors.New("out of order pkt") + go s.watcher() } diff --git a/serialstream.go b/serialstream.go index 269cb0c..f6d09ce 100644 --- a/serialstream.go +++ b/serialstream.go @@ -106,15 +106,7 @@ func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) { func (s *serialStream) handleSerialPacket(r []byte) error { gotSeq := binary.LittleEndian.Uint16(r[6:8]) - addedToFront, _ := s.rxSeqBuf.add(seqNum(gotSeq), r) - - // If the packet is not added to the front of the seqbuf, then it means that it was an answer for a - // retransmit request (or it was an out of order packet which we don't want start a retransmit). - if !addedToFront { - return nil - } - - return s.common.requestRetransmitIfNeeded(gotSeq) + return s.rxSeqBuf.add(seqNum(gotSeq), r) } func (s *serialStream) handleRead(r []byte) error { @@ -246,7 +238,7 @@ func (s *serialStream) init(devName string) error { log.Print("stream started") s.rxSeqBufEntryChan = make(chan seqBufEntry) - s.rxSeqBuf.init(serialRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) + s.rxSeqBuf.init(serialRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan, s.common.requestRetransmit) s.deinitNeededChan = make(chan bool) s.deinitFinishedChan = make(chan bool) diff --git a/streamcommon.go b/streamcommon.go index ca499ec..a1a52d5 100644 --- a/streamcommon.go +++ b/streamcommon.go @@ -24,8 +24,6 @@ type streamCommon struct { pkt0 pkt0Type pkt7 pkt7Type - - lastSeqBufFrontRxSeq uint16 } func (s *streamCommon) send(d []byte) error { @@ -154,8 +152,6 @@ func (s *streamCommon) sendRetransmitRequest(seqNum uint16) error { return nil } -type seqNumRange [2]uint16 - func (s *streamCommon) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) error { seqNumBytes := make([]byte, len(seqNumRanges)*4) for i := 0; i < len(seqNumRanges); i++ { @@ -177,35 +173,26 @@ func (s *streamCommon) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange return nil } -func (s *streamCommon) requestRetransmitIfNeeded(gotSeq uint16) error { - prevExpectedSeq := gotSeq - 1 - if s.lastSeqBufFrontRxSeq != prevExpectedSeq { - var missingPkts int - var sr seqNumRange - if prevExpectedSeq > s.lastSeqBufFrontRxSeq { - sr[0] = s.lastSeqBufFrontRxSeq - sr[1] = prevExpectedSeq - missingPkts = int(prevExpectedSeq) - int(s.lastSeqBufFrontRxSeq) - } else { - sr[0] = prevExpectedSeq - sr[1] = s.lastSeqBufFrontRxSeq - missingPkts = int(prevExpectedSeq) + 65536 - int(s.lastSeqBufFrontRxSeq) +func (s *streamCommon) requestRetransmit(r seqNumRange) error { + diff := r.getDiff(0xffff) + + if diff > maxRetransmitRequestPacketCount { + return errors.New("retransmit range too large") + } + + if diff == 0 { + log.Debug(s.name+"/requesting pkt #", r[0], " retransmit") + netstat.reportRetransmit(diff) + if err := s.sendRetransmitRequest(uint16(r[0])); err != nil { + return err } - if missingPkts == 1 { - log.Debug(s.name+"/requesting pkt #", sr[1], " retransmit") - netstat.reportRetransmit(missingPkts) - if err := s.sendRetransmitRequest(sr[1]); err != nil { - return err - } - } else if missingPkts <= maxRetransmitRequestPacketCount { - log.Debug(s.name+"/requesting pkt #", sr[0], "-#", sr[1], " retransmit") - netstat.reportRetransmit(missingPkts) - if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil { - return err - } + } else { + log.Debug(s.name+"/requesting pkt #", r[0], "-#", r[1], " retransmit") + netstat.reportRetransmit(diff) + if err := s.sendRetransmitRequestForRanges([]seqNumRange{r}); err != nil { + return err } } - s.lastSeqBufFrontRxSeq = gotSeq return nil }