Send retransmit requests for missing audio frames

This commit is contained in:
Nonoo 2020-10-25 17:54:48 +01:00
parent 0e881d6f32
commit 094b0f01ab
2 changed files with 59 additions and 19 deletions

View file

@ -23,6 +23,8 @@ type audioStream struct {
timeoutTimer *time.Timer
receivedAudio bool
lastReceivedAudioSeq uint16
lastSeqBufFrontRxSeq uint16
rxSeqBuf seqBuf
rxSeqBufEntryChan chan seqBufEntry
@ -113,30 +115,68 @@ func (s *audioStream) handleRxSeqBufEntry(e seqBufEntry) {
s.audio.play <- e.data
}
func (s *audioStream) handleAudioPacket(r []byte) {
func (s *audioStream) requestRetransmitIfNeeded(gotSeq uint16) error {
prevExpectedSeq := gotSeq - 1
if s.lastSeqBufFrontRxSeq != prevExpectedSeq {
var missingPkts int
var sr seqNumRange
if prevExpectedSeq > s.lastSeqBufFrontRxSeq {
sr[0] = s.lastSeqBufFrontRxSeq
sr[1] = prevExpectedSeq
missingPkts = int(prevExpectedSeq) - int(s.lastSeqBufFrontRxSeq)
} else {
sr[0] = prevExpectedSeq
sr[1] = s.lastSeqBufFrontRxSeq
missingPkts = int(prevExpectedSeq) + 65536 - int(s.lastSeqBufFrontRxSeq)
}
if missingPkts == 1 {
log.Debug("request audio pkt #", sr[1], " retransmit")
if err := s.sendRetransmitRequest(sr[1]); err != nil {
return err
}
} else if missingPkts < 50 {
log.Debug("request audio pkt #", sr[0], "-#", sr[1], " retransmit")
if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil {
return err
}
}
}
s.lastSeqBufFrontRxSeq = gotSeq
return nil
}
func (s *audioStream) handleAudioPacket(r []byte) error {
if s.timeoutTimer != nil {
s.timeoutTimer.Stop()
s.timeoutTimer.Reset(audioTimeoutDuration)
}
gotSeq := binary.LittleEndian.Uint16(r[6:8])
err := s.rxSeqBuf.add(seqNum(gotSeq), r[24:])
if err != nil {
log.Error(err)
addedToFront, _ := s.rxSeqBuf.add(seqNum(gotSeq), r[24:])
// If the packet is not added to the front of the seqbuf, then it means that it was an answer for a
// retransmit request (or it was an out of order packet which we don't want start a retransmit).
if !addedToFront {
return nil
}
return s.requestRetransmitIfNeeded(gotSeq)
}
func (s *audioStream) handleRead(r []byte) {
func (s *audioStream) handleRead(r []byte) error {
if len(r) >= 580 && (bytes.Equal(r[:6], []byte{0x6c, 0x05, 0x00, 0x00, 0x00, 0x00}) || bytes.Equal(r[:6], []byte{0x44, 0x02, 0x00, 0x00, 0x00, 0x00})) {
s.handleAudioPacket(r)
return s.handleAudioPacket(r)
}
return nil
}
func (s *audioStream) loop() {
for {
select {
case r := <-s.common.readChan:
s.handleRead(r)
if err := s.handleRead(r); err != nil {
reportError(err)
}
case <-s.timeoutTimer.C:
reportError(errors.New("audio stream timeout, try rebooting the radio"))
case e := <-s.rxSeqBufEntryChan:

View file

@ -73,14 +73,14 @@ func (s *seqBuf) addToBack(seq seqNum, data []byte) {
s.notifyWatcher()
}
func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) {
func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) (addedToFront bool) {
if toPos == 0 {
s.addToFront(seq, data)
return
return true
}
if toPos >= len(s.entries) {
s.addToBack(seq, data)
return
return false
}
sliceBefore := s.entries[:toPos]
sliceAfter := s.entries[toPos:]
@ -88,6 +88,7 @@ func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) {
s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...)
s.notifyWatcher()
return false
}
func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum {
@ -130,7 +131,7 @@ func (s *seqBuf) leftOrRightCloserToSeq(seq, whichSeq seqNum) direction {
return left
}
func (s *seqBuf) add(seq seqNum, data []byte) error {
func (s *seqBuf) add(seq seqNum, data []byte) (addedToFront bool, err error) {
s.mutex.Lock()
defer s.mutex.Unlock()
@ -140,22 +141,22 @@ func (s *seqBuf) add(seq seqNum, data []byte) error {
// }()
if seq > s.maxSeqNum {
return errors.New("seq out of range")
return false, errors.New("seq out of range")
}
if len(s.entries) == 0 {
s.addToFront(seq, data)
return nil
return true, nil
}
if s.entries[0].seq == seq {
return errors.New("dropping duplicate seq")
return false, errors.New("dropping duplicate seq")
}
// Checking the first entry.
if s.leftOrRightCloserToSeq(seq, s.entries[0].seq) == left {
s.addToFront(seq, data)
return nil
return true, nil
}
// Parsing through other entries if there are more than 1.
@ -164,20 +165,19 @@ func (s *seqBuf) add(seq seqNum, data []byte) error {
// It can be a beginning of a new stream for example.
if s.entries[i].seq == seq {
// s.addToFront(seq, data)
return nil
return false, nil
}
if s.leftOrRightCloserToSeq(seq, s.entries[i].seq) == left {
// log.Debug("left for ", s.entries[i].seq)
s.insert(seq, data, i)
return nil
return s.insert(seq, data, i), nil
}
// log.Debug("right for ", s.entries[i].seq)
}
// No place found for the item?
s.addToBack(seq, data)
return nil
return false, nil
}
func (s *seqBuf) getNextDataAvailableRemainingTime() (time.Duration, error) {