From 094b0f01ab668b573de112dea7a772299d2c8b34 Mon Sep 17 00:00:00 2001 From: Nonoo Date: Sun, 25 Oct 2020 17:54:48 +0100 Subject: [PATCH] Send retransmit requests for missing audio frames --- audiostream.go | 54 +++++++++++++++++++++++++++++++++++++++++++------- seqbuf.go | 24 +++++++++++----------- 2 files changed, 59 insertions(+), 19 deletions(-) diff --git a/audiostream.go b/audiostream.go index f3dccc9..9176801 100644 --- a/audiostream.go +++ b/audiostream.go @@ -23,6 +23,8 @@ type audioStream struct { timeoutTimer *time.Timer receivedAudio bool lastReceivedAudioSeq uint16 + + lastSeqBufFrontRxSeq uint16 rxSeqBuf seqBuf rxSeqBufEntryChan chan seqBufEntry @@ -113,30 +115,68 @@ func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) { s.audio.play <- e.data } -func (s *audioStream) handleAudioPacket(r []byte) { +func (s *audioStream) requestRetransmitIfNeeded(gotSeq uint16) error { + prevExpectedSeq := gotSeq - 1 + if s.lastSeqBufFrontRxSeq != prevExpectedSeq { + var missingPkts int + var sr seqNumRange + if prevExpectedSeq > s.lastSeqBufFrontRxSeq { + sr[0] = s.lastSeqBufFrontRxSeq + sr[1] = prevExpectedSeq + missingPkts = int(prevExpectedSeq) - int(s.lastSeqBufFrontRxSeq) + } else { + sr[0] = prevExpectedSeq + sr[1] = s.lastSeqBufFrontRxSeq + missingPkts = int(prevExpectedSeq) + 65536 - int(s.lastSeqBufFrontRxSeq) + } + if missingPkts == 1 { + log.Debug("request audio pkt #", sr[1], " retransmit") + if err := s.sendRetransmitRequest(sr[1]); err != nil { + return err + } + } else if missingPkts < 50 { + log.Debug("request audio pkt #", sr[0], "-#", sr[1], " retransmit") + if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil { + return err + } + } + } + s.lastSeqBufFrontRxSeq = gotSeq + return nil +} + +func (s *audioStream) handleAudioPacket(r []byte) error { if s.timeoutTimer != nil { s.timeoutTimer.Stop() s.timeoutTimer.Reset(audioTimeoutDuration) } gotSeq := binary.LittleEndian.Uint16(r[6:8]) - err := s.rxSeqBuf.add(seqNum(gotSeq), r[24:]) - if err != nil { - log.Error(err) + addedToFront, _ := s.rxSeqBuf.add(seqNum(gotSeq), r[24:]) + + // If the packet is not added to the front of the seqbuf, then it means that it was an answer for a + // retransmit request (or it was an out of order packet which we don't want start a retransmit). + if !addedToFront { + return nil } + + return s.requestRetransmitIfNeeded(gotSeq) } -func (s *audioStream) handleRead(r []byte) { +func (s *audioStream) handleRead(r []byte) error { if len(r) >= 580 && (bytes.Equal(r[:6], []byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00}) || bytes.Equal(r[:6], []byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00})) { - s.handleAudioPacket(r) + return s.handleAudioPacket(r) } + return nil } func (s *audioStream) loop() { for { select { case r := <-s.common.readChan: - s.handleRead(r) + if err := s.handleRead(r); err != nil { + reportError(err) + } case <-s.timeoutTimer.C: reportError(errors.New("audio stream timeout, try rebooting the radio")) case e := <-s.rxSeqBufEntryChan: diff --git a/seqbuf.go b/seqbuf.go index 5d39b54..37b485c 100644 --- a/seqbuf.go +++ b/seqbuf.go @@ -73,14 +73,14 @@ func (s *seqBuf) addToBack(seq seqNum, data []byte) { s.notifyWatcher() } -func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) { +func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) (addedToFront bool) { if toPos == 0 { s.addToFront(seq, data) - return + return true } if toPos >= len(s.entries) { s.addToBack(seq, data) - return + return false } sliceBefore := s.entries[:toPos] sliceAfter := s.entries[toPos:] @@ -88,6 +88,7 @@ func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) { s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...) s.notifyWatcher() + return false } func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum { @@ -130,7 +131,7 @@ func (s *seqBuf) leftOrRightCloserToSeq(seq, whichSeq seqNum) direction { return left } -func (s *seqBuf) add(seq seqNum, data []byte) error { +func (s *seqBuf) add(seq seqNum, data []byte) (addedToFront bool, err error) { s.mutex.Lock() defer s.mutex.Unlock() @@ -140,22 +141,22 @@ func (s *seqBuf) add(seq seqNum, data []byte) error { // }() if seq > s.maxSeqNum { - return errors.New("seq out of range") + return false, errors.New("seq out of range") } if len(s.entries) == 0 { s.addToFront(seq, data) - return nil + return true, nil } if s.entries[0].seq == seq { - return errors.New("dropping duplicate seq") + return false, errors.New("dropping duplicate seq") } // Checking the first entry. if s.leftOrRightCloserToSeq(seq, s.entries[0].seq) == left { s.addToFront(seq, data) - return nil + return true, nil } // Parsing through other entries if there are more than 1. @@ -164,20 +165,19 @@ func (s *seqBuf) add(seq seqNum, data []byte) error { // It can be a beginning of a new stream for example. if s.entries[i].seq == seq { // s.addToFront(seq, data) - return nil + return false, nil } if s.leftOrRightCloserToSeq(seq, s.entries[i].seq) == left { // log.Debug("left for ", s.entries[i].seq) - s.insert(seq, data, i) - return nil + return s.insert(seq, data, i), nil } // log.Debug("right for ", s.entries[i].seq) } // No place found for the item? s.addToBack(seq, data) - return nil + return false, nil } func (s *seqBuf) getNextDataAvailableRemainingTime() (time.Duration, error) {