Decrease latency of RX seqbuf

This commit is contained in:
Nonoo 2020-11-21 11:01:37 +01:00
parent 44f5b15994
commit bed7d95f7c
5 changed files with 220 additions and 116 deletions

View file

@ -17,8 +17,9 @@ import (
const audioSampleRate = 48000 const audioSampleRate = 48000
const audioSampleBytes = 2 const audioSampleBytes = 2
const pulseAudioBufferLength = 100 * time.Millisecond const pulseAudioBufferLength = 100 * time.Millisecond
const audioFrameSize = 1920 // 20ms const audioFrameLength = 20 * time.Millisecond
const maxPlayBufferSize = audioFrameSize * 5 const audioFrameSize = int((audioSampleRate * audioSampleBytes * audioFrameLength) / time.Second)
const maxPlayBufferSize = audioFrameSize*5 + int((audioSampleRate*audioSampleBytes*audioRxSeqBufLength)/time.Second)
type audioStruct struct { type audioStruct struct {
devName string devName string
@ -88,7 +89,7 @@ func (a *audioStruct) toggleRecFromDefaultSoundcard() {
if a.defaultSoundcardStream.recStream == nil { if a.defaultSoundcardStream.recStream == nil {
ss := pulse.SampleSpec{Format: pulse.SAMPLE_S16LE, Rate: audioSampleRate, Channels: 1} ss := pulse.SampleSpec{Format: pulse.SAMPLE_S16LE, Rate: audioSampleRate, Channels: 1}
battr := pulse.NewBufferAttr() battr := pulse.NewBufferAttr()
battr.Fragsize = audioFrameSize battr.Fragsize = uint32(audioFrameSize)
var err error var err error
a.defaultSoundcardStream.recStream, err = pulse.NewStream("", "kappanhang", pulse.STREAM_RECORD, "", a.devName, a.defaultSoundcardStream.recStream, err = pulse.NewStream("", "kappanhang", pulse.STREAM_RECORD, "", a.devName,
&ss, nil, battr) &ss, nil, battr)

View file

@ -112,14 +112,7 @@ func (s *audioStream) handleAudioPacket(r []byte) error {
s.timeoutTimer.Reset(audioTimeoutDuration) s.timeoutTimer.Reset(audioTimeoutDuration)
} }
addedToFront, _ := s.rxSeqBuf.add(seqNum(gotSeq), r[24:]) return s.rxSeqBuf.add(seqNum(gotSeq), r[24:])
if !addedToFront {
// If the packet is not added to the front of the seqbuf, then it means that it was an answer for a
// retransmit request (or it was an out of order packet which we don't want start a retransmit request).
return nil
}
return s.common.requestRetransmitIfNeeded(gotSeq)
} }
func (s *audioStream) handleRead(r []byte) error { func (s *audioStream) handleRead(r []byte) error {
@ -175,7 +168,7 @@ func (s *audioStream) init(devName string) error {
log.Print("stream started") log.Print("stream started")
s.rxSeqBufEntryChan = make(chan seqBufEntry) s.rxSeqBufEntryChan = make(chan seqBufEntry)
s.rxSeqBuf.init(audioRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) s.rxSeqBuf.init(audioRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan, s.common.requestRetransmit)
s.timeoutTimer = time.NewTimer(audioTimeoutDuration) s.timeoutTimer = time.NewTimer(audioTimeoutDuration)

227
seqbuf.go
View file

@ -8,18 +8,67 @@ import (
type seqNum int type seqNum int
func (s *seqNum) inc(maxSeqNum seqNum) seqNum {
if *s == maxSeqNum {
return 0
}
return *s + 1
}
func (s *seqNum) dec(maxSeqNum seqNum) seqNum {
if *s == 0 {
return maxSeqNum
}
return *s - 1
}
type seqNumRange [2]seqNum
func (r *seqNumRange) getDiff(maxSeqNum seqNum) (diff int) {
from := r[0]
to := r[1]
if to >= from {
diff = int(to) - int(from)
} else {
r[0] = from
r[1] = to
diff = (int(maxSeqNum) + 1) - int(from) + int(to)
}
return
}
type seqBufEntry struct { type seqBufEntry struct {
seq seqNum seq seqNum
data []byte data []byte
addedAt time.Time
} }
type requestRetransmitCallbackType func(r seqNumRange) error
type seqBuf struct { type seqBuf struct {
length time.Duration length time.Duration
maxSeqNum seqNum maxSeqNum seqNum
maxSeqNumDiff seqNum maxSeqNumDiff seqNum
requestRetransmitCallback requestRetransmitCallbackType
// Available entries coming out from the seqbuf will be sent to entryChan.
entryChan chan seqBufEntry entryChan chan seqBufEntry
// If this is true then the seqBuf is locked, which means no entries will be sent to entryChan.
lockedByInvalidSeq bool
lockedAt time.Time
// This is false until no packets have been sent to the entryChan.
alreadyReturnedFirstSeq bool
// The seqNum of the last packet sent to entryChan.
lastReturnedSeq seqNum
requestedRetransmit bool
lastRequestedRetransmitRange seqNumRange
ignoreMissingPktsUntilEnabled bool
ignoreMissingPktsUntilSeq seqNum
// Note that the most recently added entry is stored as the 0th entry. // Note that the most recently added entry is stored as the 0th entry.
entries []seqBufEntry entries []seqBufEntry
mutex sync.RWMutex mutex sync.RWMutex
@ -27,6 +76,8 @@ type seqBuf struct {
entryAddedChan chan bool entryAddedChan chan bool
watcherCloseNeededChan chan bool watcherCloseNeededChan chan bool
watcherCloseDoneChan chan bool watcherCloseDoneChan chan bool
errOutOfOrder error
} }
// func (s *seqBuf) string() (out string) { // func (s *seqBuf) string() (out string) {
@ -46,7 +97,6 @@ func (s *seqBuf) createEntry(seq seqNum, data []byte) seqBufEntry {
return seqBufEntry{ return seqBufEntry{
seq: seq, seq: seq,
data: data, data: data,
addedAt: time.Now(),
} }
} }
@ -66,29 +116,26 @@ func (s *seqBuf) addToFront(seq seqNum, data []byte) {
func (s *seqBuf) addToBack(seq seqNum, data []byte) { func (s *seqBuf) addToBack(seq seqNum, data []byte) {
e := s.createEntry(seq, data) e := s.createEntry(seq, data)
e.addedAt = time.Time{} // Release the packet from the seqbuf as soon as possible, as it is a late entry.
s.entries = append(s.entries, e) s.entries = append(s.entries, e)
s.notifyWatcher() s.notifyWatcher()
} }
func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) (addedToFront bool) { func (s *seqBuf) insert(seq seqNum, data []byte, toPos int) {
if toPos == 0 { if toPos == 0 {
s.addToFront(seq, data) s.addToFront(seq, data)
return true return
} }
if toPos >= len(s.entries) { if toPos >= len(s.entries) {
s.addToBack(seq, data) s.addToBack(seq, data)
return false return
} }
sliceBefore := s.entries[:toPos] sliceBefore := s.entries[:toPos]
sliceAfter := s.entries[toPos:] sliceAfter := s.entries[toPos:]
e := s.createEntry(seq, data) e := s.createEntry(seq, data)
e.addedAt = time.Time{} // Release the packet from the seqbuf as soon as possible, as it is a late entry.
s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...) s.entries = append(sliceBefore, append([]seqBufEntry{e}, sliceAfter...)...)
s.notifyWatcher() s.notifyWatcher()
return false
} }
func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum { func (s *seqBuf) getDiff(seq1, seq2 seqNum) seqNum {
@ -132,7 +179,7 @@ func (s *seqBuf) leftOrRightCloserToSeq(seq, whichSeq seqNum) direction {
return left return left
} }
func (s *seqBuf) add(seq seqNum, data []byte) (addedToFront bool, err error) { func (s *seqBuf) add(seq seqNum, data []byte) error {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -142,74 +189,156 @@ func (s *seqBuf) add(seq seqNum, data []byte) (addedToFront bool, err error) {
// }() // }()
if seq > s.maxSeqNum { if seq > s.maxSeqNum {
return false, errors.New("seq out of range") return errors.New("seq out of range")
} }
if len(s.entries) == 0 { if len(s.entries) == 0 {
s.addToFront(seq, data) s.addToFront(seq, data)
return true, nil return nil
} }
if s.entries[0].seq == seq { if s.entries[0].seq == seq { // Dropping duplicate seq.
return false, errors.New("dropping duplicate seq") return nil
} }
// Checking the first entry. // Checking the first entry.
if s.leftOrRightCloserToSeq(seq, s.entries[0].seq) == left { if s.leftOrRightCloserToSeq(seq, s.entries[0].seq) == left {
s.addToFront(seq, data) s.addToFront(seq, data)
return true, nil return nil
} }
// Parsing through other entries if there are more than 1. // Parsing through other entries if there are more than 1.
for i := 1; i < len(s.entries); i++ { for i := 1; i < len(s.entries); i++ {
// This seqnum is already in the queue, but not as the first entry? It's not a duplicate packet. // This seqnum is already in the queue? Ignoring it.
// It can be a beginning of a new stream for example.
if s.entries[i].seq == seq { if s.entries[i].seq == seq {
// s.addToFront(seq, data) return nil
return false, nil
} }
if s.leftOrRightCloserToSeq(seq, s.entries[i].seq) == left { if s.leftOrRightCloserToSeq(seq, s.entries[i].seq) == left {
// log.Debug("left for ", s.entries[i].seq) // log.Debug("left for ", s.entries[i].seq)
return s.insert(seq, data, i), nil s.insert(seq, data, i)
return nil
} }
// log.Debug("right for ", s.entries[i].seq) // log.Debug("right for ", s.entries[i].seq)
} }
// No place found for the item? // No place found for the item?
s.addToBack(seq, data) s.addToBack(seq, data)
return false, nil return nil
} }
func (s *seqBuf) getNextDataAvailableRemainingTime() (time.Duration, error) { func (s *seqBuf) checkLockTimeout() (timeout bool, shouldRetryIn time.Duration) {
s.mutex.RLock() timeSinceLastInvalidSeq := time.Since(s.lockedAt)
defer s.mutex.RUnlock() if s.length > timeSinceLastInvalidSeq {
shouldRetryIn = s.length - timeSinceLastInvalidSeq
return
}
if len(s.entries) == 0 { s.lockedByInvalidSeq = false
return 0, errors.New("seqbuf is empty") // log.Debug("lock timeout")
if s.requestedRetransmit {
s.ignoreMissingPktsUntilSeq = s.lastRequestedRetransmitRange[1]
s.ignoreMissingPktsUntilEnabled = true
} }
lastEntryIdx := len(s.entries) - 1
inBufDuration := time.Since(s.entries[lastEntryIdx].addedAt) return true, 0
if inBufDuration >= s.length {
return 0, nil
}
return s.length - inBufDuration, nil
} }
func (s *seqBuf) get() (e seqBufEntry, err error) { // Returns true if all entries from the requested retransmit range have been received.
func (s *seqBuf) gotRetransmitRange() bool {
entryIdx := len(s.entries)
rangeSeq := s.lastRequestedRetransmitRange[0]
for {
entryIdx--
if entryIdx < 0 {
return false
}
if s.entries[entryIdx].seq != rangeSeq {
// log.Debug("entry idx ", entryIdx, " seq #", s.entries[entryIdx].seq, " does not match ", rangeSeq)
// log.Debug(s.string())
return false
}
if rangeSeq == s.lastRequestedRetransmitRange[1] {
return true
}
rangeSeq = rangeSeq.inc(s.maxSeqNum)
}
}
// shouldRetryIn is only filled when no entry is available, but there are entries in the seqbuf.
// err is not nil if the seqbuf is empty.
func (s *seqBuf) get() (e seqBufEntry, shouldRetryIn time.Duration, err error) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
if len(s.entries) == 0 { if len(s.entries) == 0 {
return e, errors.New("seqbuf is empty") return e, 0, errors.New("seqbuf is empty")
}
lastEntryIdx := len(s.entries) - 1
if time.Since(s.entries[lastEntryIdx].addedAt) < s.length {
return e, errors.New("no available entries")
} }
entryCount := len(s.entries)
lastEntryIdx := entryCount - 1
e = s.entries[lastEntryIdx] e = s.entries[lastEntryIdx]
if s.alreadyReturnedFirstSeq {
if s.lockedByInvalidSeq {
if s.requestedRetransmit && s.gotRetransmitRange() {
s.lockedByInvalidSeq = false
// log.Debug("lock over")
} else {
var timeout bool
if timeout, shouldRetryIn = s.checkLockTimeout(); !timeout {
return
}
}
} else {
if s.leftOrRightCloserToSeq(e.seq, seqNum(s.lastReturnedSeq)) != left {
// log.Debug("ignoring out of order seq ", e.seq)
s.entries = s.entries[:lastEntryIdx] s.entries = s.entries[:lastEntryIdx]
return e, nil err = s.errOutOfOrder
return
}
if s.ignoreMissingPktsUntilEnabled {
if s.leftOrRightCloserToSeq(e.seq, s.ignoreMissingPktsUntilSeq) == left {
// log.Debug("ignore over ", e.seq, " ", s.ignoreMissingPktsUntilSeq)
s.ignoreMissingPktsUntilEnabled = false
} else {
// log.Debug("ignoring missing pkt, seq #", e.seq, " until ", s.ignoreMissingPktsUntilSeq)
}
} else {
expectedNextSeq := s.lastReturnedSeq.inc(s.maxSeqNum)
if e.seq != expectedNextSeq {
// log.Debug("lock on, expected seq ", expectedNextSeq, " got ", e.seq)
s.lockedByInvalidSeq = true
s.lockedAt = time.Now()
s.requestedRetransmit = false
s.ignoreMissingPktsUntilEnabled = false
shouldRetryIn = s.length
if s.requestRetransmitCallback != nil {
s.lastRequestedRetransmitRange[0] = expectedNextSeq
s.lastRequestedRetransmitRange[1] = e.seq.dec(s.maxSeqNum)
if err = s.requestRetransmitCallback(s.lastRequestedRetransmitRange); err == nil {
s.requestedRetransmit = true
}
}
return
}
}
}
}
s.lastReturnedSeq = e.seq
s.alreadyReturnedFirstSeq = true
s.entries = s.entries[:lastEntryIdx]
return e, 0, nil
} }
func (s *seqBuf) watcher() { func (s *seqBuf) watcher() {
@ -227,11 +356,8 @@ func (s *seqBuf) watcher() {
for retry { for retry {
retry = false retry = false
t, err := s.getNextDataAvailableRemainingTime() e, t, err := s.get()
if err == nil { if err == nil && t == 0 {
if t == 0 { // Do we have an entry available right now?
e, err := s.get()
if err == nil {
if s.entryChan != nil { if s.entryChan != nil {
select { select {
case s.entryChan <- e: case s.entryChan <- e:
@ -239,13 +365,13 @@ func (s *seqBuf) watcher() {
return return
} }
} }
} else {
log.Error(err)
}
// We may have further available entries. // We may have further available entries.
retry = true retry = true
} else if !entryAvailableTimerRunning { } else {
if err == s.errOutOfOrder {
retry = true
} else if !entryAvailableTimerRunning && t > 0 {
// An entry will be available later, waiting for it. // An entry will be available later, waiting for it.
entryAvailableTimer.Reset(t) entryAvailableTimer.Reset(t)
entryAvailableTimerRunning = true entryAvailableTimerRunning = true
@ -265,15 +391,20 @@ func (s *seqBuf) watcher() {
// Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range. // Setting a max. seqnum diff is optional. If it's 0 then the diff will be half of the maxSeqNum range.
// Available entries coming out from the seqbuf will be sent to entryChan. // Available entries coming out from the seqbuf will be sent to entryChan.
func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry) { func (s *seqBuf) init(length time.Duration, maxSeqNum, maxSeqNumDiff seqNum, entryChan chan seqBufEntry,
requestRetransmitCallback requestRetransmitCallbackType) {
s.length = length s.length = length
s.maxSeqNum = maxSeqNum s.maxSeqNum = maxSeqNum
s.maxSeqNumDiff = maxSeqNumDiff s.maxSeqNumDiff = maxSeqNumDiff
s.entryChan = entryChan s.entryChan = entryChan
s.requestRetransmitCallback = requestRetransmitCallback
s.entryAddedChan = make(chan bool) s.entryAddedChan = make(chan bool)
s.watcherCloseNeededChan = make(chan bool) s.watcherCloseNeededChan = make(chan bool)
s.watcherCloseDoneChan = make(chan bool) s.watcherCloseDoneChan = make(chan bool)
s.errOutOfOrder = errors.New("out of order pkt")
go s.watcher() go s.watcher()
} }

View file

@ -106,15 +106,7 @@ func (s *serialStream) handleRxSeqBufEntry(e seqBufEntry) {
func (s *serialStream) handleSerialPacket(r []byte) error { func (s *serialStream) handleSerialPacket(r []byte) error {
gotSeq := binary.LittleEndian.Uint16(r[6:8]) gotSeq := binary.LittleEndian.Uint16(r[6:8])
addedToFront, _ := s.rxSeqBuf.add(seqNum(gotSeq), r) return s.rxSeqBuf.add(seqNum(gotSeq), r)
// If the packet is not added to the front of the seqbuf, then it means that it was an answer for a
// retransmit request (or it was an out of order packet which we don't want start a retransmit).
if !addedToFront {
return nil
}
return s.common.requestRetransmitIfNeeded(gotSeq)
} }
func (s *serialStream) handleRead(r []byte) error { func (s *serialStream) handleRead(r []byte) error {
@ -246,7 +238,7 @@ func (s *serialStream) init(devName string) error {
log.Print("stream started") log.Print("stream started")
s.rxSeqBufEntryChan = make(chan seqBufEntry) s.rxSeqBufEntryChan = make(chan seqBufEntry)
s.rxSeqBuf.init(serialRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan) s.rxSeqBuf.init(serialRxSeqBufLength, 0xffff, 0, s.rxSeqBufEntryChan, s.common.requestRetransmit)
s.deinitNeededChan = make(chan bool) s.deinitNeededChan = make(chan bool)
s.deinitFinishedChan = make(chan bool) s.deinitFinishedChan = make(chan bool)

View file

@ -24,8 +24,6 @@ type streamCommon struct {
pkt0 pkt0Type pkt0 pkt0Type
pkt7 pkt7Type pkt7 pkt7Type
lastSeqBufFrontRxSeq uint16
} }
func (s *streamCommon) send(d []byte) error { func (s *streamCommon) send(d []byte) error {
@ -154,8 +152,6 @@ func (s *streamCommon) sendRetransmitRequest(seqNum uint16) error {
return nil return nil
} }
type seqNumRange [2]uint16
func (s *streamCommon) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) error { func (s *streamCommon) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange) error {
seqNumBytes := make([]byte, len(seqNumRanges)*4) seqNumBytes := make([]byte, len(seqNumRanges)*4)
for i := 0; i < len(seqNumRanges); i++ { for i := 0; i < len(seqNumRanges); i++ {
@ -177,35 +173,26 @@ func (s *streamCommon) sendRetransmitRequestForRanges(seqNumRanges []seqNumRange
return nil return nil
} }
func (s *streamCommon) requestRetransmitIfNeeded(gotSeq uint16) error { func (s *streamCommon) requestRetransmit(r seqNumRange) error {
prevExpectedSeq := gotSeq - 1 diff := r.getDiff(0xffff)
if s.lastSeqBufFrontRxSeq != prevExpectedSeq {
var missingPkts int if diff > maxRetransmitRequestPacketCount {
var sr seqNumRange return errors.New("retransmit range too large")
if prevExpectedSeq > s.lastSeqBufFrontRxSeq { }
sr[0] = s.lastSeqBufFrontRxSeq
sr[1] = prevExpectedSeq if diff == 0 {
missingPkts = int(prevExpectedSeq) - int(s.lastSeqBufFrontRxSeq) log.Debug(s.name+"/requesting pkt #", r[0], " retransmit")
netstat.reportRetransmit(diff)
if err := s.sendRetransmitRequest(uint16(r[0])); err != nil {
return err
}
} else { } else {
sr[0] = prevExpectedSeq log.Debug(s.name+"/requesting pkt #", r[0], "-#", r[1], " retransmit")
sr[1] = s.lastSeqBufFrontRxSeq netstat.reportRetransmit(diff)
missingPkts = int(prevExpectedSeq) + 65536 - int(s.lastSeqBufFrontRxSeq) if err := s.sendRetransmitRequestForRanges([]seqNumRange{r}); err != nil {
}
if missingPkts == 1 {
log.Debug(s.name+"/requesting pkt #", sr[1], " retransmit")
netstat.reportRetransmit(missingPkts)
if err := s.sendRetransmitRequest(sr[1]); err != nil {
return err
}
} else if missingPkts <= maxRetransmitRequestPacketCount {
log.Debug(s.name+"/requesting pkt #", sr[0], "-#", sr[1], " retransmit")
netstat.reportRetransmit(missingPkts)
if err := s.sendRetransmitRequestForRanges([]seqNumRange{sr}); err != nil {
return err return err
} }
} }
}
s.lastSeqBufFrontRxSeq = gotSeq
return nil return nil
} }